In [72]:
import pyspark

In [73]:
from pyspark.sql import *
from pyspark.sql import SparkSession

In [74]:
import json

In [75]:
import hashlib

In [76]:
f = open('donation_np.json')
data = json.load(f)

In [77]:
spark = SparkSession \
    .builder \
    .appName("Python Spark Donation ") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [78]:
df = spark.read.json("donation_np.json")
df.printSchema()
df.show()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------

In [79]:
df = df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given","pan_given")

In [80]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|                null|    null|                null|    null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |    

In [81]:
from pyspark.sql.functions import sha2, concat_ws
df = df.withColumn("Address", sha2(concat_ws("||", "Address"), 256))
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|e3b0c44298fc1c149...|    null|                null|    null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |    

In [82]:
df = df.drop("_corrupt_record","field10","field11","field12","field13","field14","field9")

In [83]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|e3b0c44298fc1c149...|    null|                null|    null|                null|     null|  null|  null|
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|      

In [84]:
df = df.where(df.Name.isNotNull())
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 SBI| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

In [85]:
from pyspark.sql.functions import lower, col, when, lit
df = df.withColumn("mode_of_payment", lower(col("mode_of_payment")))

In [86]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, hdfc bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|cheque, state ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|through bank tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|through bank tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 sbi| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

In [87]:
df = df.withColumn("mode_of_payment", when(col("mode_of_payment").contains("cash"),"Cash")
      .when(col("mode_of_payment").contains("cheque"),"Cheque")
      .when(col("mode_of_payment").contains("ch."),"Cheque")
      .when(col("mode_of_payment").contains("bank"),"Bank")
      .otherwise("Others"))

df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           Bank| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|         Others| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|         Others| 2011-12|   Uma Shankar Gupta|     

In [88]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions
df = df.withColumn("Amount",col("Amount").cast(IntegerType()))
cf = df.groupBy("Party").agg(functions.sum("Amount"),functions.count("Party"),functions.avg("Amount"),functions.max("Amount"))
cf.show()

+------+-----------+------------+------------------+-----------+
| Party|sum(Amount)|count(Party)|       avg(Amount)|max(Amount)|
+------+-----------+------------+------------------+-----------+
|   INC| 4031487349|        3785|1065122.1529722589|  500000000|
|   BJP| 9295525996|        8782|1058474.8344340697|  500000000|
|   NCP|  647911419|         107| 6055246.906542056|   50000000|
|   CPI|   68123698|         384|177405.46354166666|    3000000|
|CPI(M)|  150622128|         515| 292470.1514563107|   10000000|
+------+-----------+------------+------------------+-----------+



In [89]:
#Aggregate Functions

def GetValueFromDataframe(df,columnName):
    a = []
    for row in df.rdd.collect():       
        a.append(row[columnName])
    return a

sum_arr = GetValueFromDataframe(cf,"sum(Amount)")
count_arr = GetValueFromDataframe(cf,"count(Party)")
avg_arr = GetValueFromDataframe(cf,"avg(Amount)")
max_arr = GetValueFromDataframe(cf,"max(Amount)")
party_arr = GetValueFromDataframe(cf,"Party")

i = 0
j = 0

party_arr_end = ["_SUM_LTD","_COUNT_LTD","_AVG_LTD","_MAX_LTD"]
agg_values = [sum_arr,count_arr,avg_arr,max_arr]

while i < 4:
    while j < len(party_arr):
        col_name = party_arr[j] + party_arr_end[i]
        df = df.withColumn(col_name,lit(0))
        df = df.withColumn(col_name, when(col("Party") == party_arr[j],agg_values[i][j])
        .otherwise(0))
        j += 1
    
    j = 0
    i += 1

df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|   CPI(M)_AVG_LTD|INC_MAX_LTD|BJP_MAX_LTD|NCP_MAX_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----

In [90]:
inc_top_donor = df.filter(df["Amount"] == max_arr[0]).select("Name").collect()[0][0]

bjp_top_donor = df.filter(df["Amount"] == max_arr[1]).select("Name").collect()[0][0]

ncp_top_donor = df.filter(df["Amount"] == max_arr[2]).select("Name").collect()[0][0]

cpi_top_donor = df.filter(df["Amount"] == max_arr[3]).select("Name").collect()[0][0]

cpi_m_top_donor = df.filter(df["Amount"] == max_arr[4]).select("Name").collect()[0][0]


In [91]:
i = 0

top_donor_arr = [inc_top_donor,bjp_top_donor,ncp_top_donor,cpi_top_donor,cpi_m_top_donor]

while i < 5:
    col_name = party_arr[i] + "_TOP_DONOR"
    df = df.withColumn(col_name,lit(0))
    df = df.withColumn(col_name, when(col("Party") == party_arr[i],top_donor_arr[i])
    .otherwise(0))
    i += 1

df.select("Party","INC_TOP_DONOR","BJP_TOP_DONOR","NCP_TOP_DONOR","CPI_TOP_DONOR","CPI(M)_TOP_DONOR").show()

+------+--------------------+--------------------+-------------+-------------+----------------+
| Party|       INC_TOP_DONOR|       BJP_TOP_DONOR|NCP_TOP_DONOR|CPI_TOP_DONOR|CPI(M)_TOP_DONOR|
+------+--------------------+--------------------+-------------+-------------+----------------+
|   CPI|                   0|                   0|            0|   Aziz Pasha|               0|
|CPI(M)|                   0|                   0|            0|            0|V K Ramachandran|
|   INC|General Electoral...|                   0|            0|            0|               0|
|   INC|General Electoral...|                   0|            0|            0|               0|
|   INC|General Electoral...|                   0|            0|            0|               0|
|   BJP|                   0|General Electoral...|            0|            0|               0|
|   BJP|                   0|General Electoral...|            0|            0|               0|
|   BJP|                   0|General Ele

In [92]:
sdon = df.groupBy("Party","fin_year").sum("Amount").sort("fin_year")
sdon.show()

+------+--------+-----------+
| Party|fin_year|sum(Amount)|
+------+--------+-----------+
|   INC| 2003-04|   28301101|
|CPI(M)| 2003-04|     200000|
|   BJP| 2003-04|  116881973|
|   CPI| 2003-04|     779148|
|   INC| 2004-05|  320555643|
|CPI(M)| 2004-05|     896355|
|   CPI| 2004-05|     630000|
|   BJP| 2004-05|  339521289|
|   CPI| 2005-06|    3988690|
|   BJP| 2005-06|   36156111|
|CPI(M)| 2005-06|     550000|
|   INC| 2005-06|   59212492|
|CPI(M)| 2006-07|    1124719|
|   BJP| 2006-07|   29550672|
|   INC| 2006-07|  121273513|
|   CPI| 2006-07|    1229400|
|   CPI| 2007-08|    4125800|
|CPI(M)| 2007-08|    7226116|
|   BJP| 2007-08|  249623653|
|   NCP| 2007-08|   10225000|
+------+--------+-----------+
only showing top 20 rows



In [93]:
for row in sdon.collect():
    name = row[1] + "_" + row[0] + "_" + "SUM"
    df = df.withColumn(name,lit('-'))
    df = df.withColumn(name, when(col("Party") == row[0], row[2])
    .otherwise('-'))

df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+--------------------+--------------------+-------------+-------------+----------------+---------------+---------------+---------------+------------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------

In [100]:
ndon = df.groupBy("mode_of_payment").count()
ndon.show()

+---------------+-----+
|mode_of_payment|count|
+---------------+-----+
|           Bank| 7338|
|         Cheque| 2781|
|           Cash|  918|
|         Others| 2536|
+---------------+-----+



In [99]:
for row in ndon.collect():
    name = row[0] + "_COUNT_LTD"
    df = df.withColumn(name,lit('-'))
    df = df.withColumn(name, when(col("mode_of_payment") == row[0], row[1])
    .otherwise('-'))

df.select("mode_of_payment","BANK_COUNT_LTD","CHEQUE_COUNT_LTD","CASH_COUNT_LTD","OTHERS_COUNT_LTD").show()

+---------------+--------------+----------------+--------------+----------------+
|mode_of_payment|BANK_COUNT_LTD|CHEQUE_COUNT_LTD|CASH_COUNT_LTD|OTHERS_COUNT_LTD|
+---------------+--------------+----------------+--------------+----------------+
|           Cash|             -|               -|           918|               -|
|           Bank|          7338|               -|             -|               -|
|         Cheque|             -|            2781|             -|               -|
|           Bank|          7338|               -|             -|               -|
|           Bank|          7338|               -|             -|               -|
|         Others|             -|               -|             -|            2536|
|         Others|             -|               -|             -|            2536|
|           Bank|          7338|               -|             -|               -|
|         Others|             -|               -|             -|            2536|
|           Bank

In [101]:
df.write.parquet("donations.parquet")

Py4JJavaError: An error occurred while calling o2558.parquet.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:209)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:343)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:344)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:901)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 22 more
