In [65]:
import pyspark
import json

In [66]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark assignment") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [67]:
#Task1: Import Json
df = spark.read.json("donation_np.json")
df.printSchema()
# Displays the content of the DataFrame to stdout
df.show()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------

In [68]:
#Task2: Rename Column
df = df.withColumnRenamed("Contribution Mode","mode_of_payment")\
    .withColumnRenamed("Financial Year","fin_year")\
        .withColumnRenamed("PAN Given","pan_given")

df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|                null|    null|                null|    null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |    

In [69]:
#dropping NULL values
df=df.na.drop(subset=['Name'])
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [70]:
df.columns

['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type',
 '_corrupt_record',
 'field10',
 'field11',
 'field12',
 'field13',
 'field14',
 'field9']

In [71]:
df=df.drop("_corrupt_record","field10","field11","field12", "field13", "field14", "field9") 

In [72]:
df.columns

['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type']

In [73]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|9,Firozshah Road ...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|17,Dr.B.R.Mehta L...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|31 Shamla Hills B...|  100000|          146865 SBI| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|45 Bunglows Bhopa...|  100000|      

In [74]:
#Task3: Encrypt Column Address
from pyspark.sql.functions import sha2, concat_ws
df=df.withColumn("Address", sha2(concat_ws("||", df.Address), 256))
df.show()


+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 SBI| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

In [75]:
#Task4: Categorize
from pyspark.sql.functions import *
df=df.withColumn("mode_of_payment", 
    when(df.mode_of_payment.like("Ch.%"), "Cheque")\
    .when(df.mode_of_payment.like("%Cheque%"), "Cheque")\
    .when(df.mode_of_payment.like("%CASH%"), "Cash")\
    .when(df.mode_of_payment.like("%Bank%"), "Bank").otherwise("Others"))
df.show()

        
                                    

+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           Bank| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|         Others| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|         Others| 2011-12|   Uma Shankar Gupta|     

In [76]:
#Task5: Calculate Aggregates

#Typecast
from pyspark.sql.types import IntegerType
df = df.withColumn("Amount", df["Amount"].cast(IntegerType()))

def party_aggregate(party_type):
    INC_SUM=df.groupBy("Party").sum("Amount").filter(df.Party==party_type).collect()[0][1]
    INC_MAX=df.groupBy("Party").max("Amount").filter(df.Party==party_type).collect()[0][1]
    INC_AVG=df.groupBy("Party").avg("Amount").filter(df.Party==party_type).collect()[0][1]
    INC_COUNT=df.groupBy("Party").agg(count("Amount")).filter(df.Party==party_type).collect()[0][1]

    agg_list=[INC_SUM, INC_MAX, INC_AVG, INC_COUNT]
    return agg_list


result_list_INC=party_aggregate("INC")
result_list_BJP=party_aggregate("BJP")
result_list_NCP=party_aggregate("NCP")
result_list_CPI=party_aggregate("CPI")
result_list_CPIM=party_aggregate("CPI(M)")




def create_new_column(df,col_name,party_name,agg): 
    df=df.withColumn(col_name, lit(agg)).withColumn(col_name, when(df.Party.like(party_name), agg).otherwise("0"))
    return df

def add_partywise_agg_col(df, party_name, party_agg_list):
    df=create_new_column(df, party_name+"_SUM_LTD",party_name,party_agg_list[0])
    df=create_new_column(df, party_name+"_MAX_LTD", party_name,party_agg_list[1])
    df=create_new_column(df, party_name+"_AVG_LTD", party_name,party_agg_list[2])
    df=create_new_column(df, party_name+"_COUNT_LTD", party_name,party_agg_list[3])
    return df

df=add_partywise_agg_col(df, "INC", result_list_INC)
df=add_partywise_agg_col(df, "BJP", result_list_BJP)
df=add_partywise_agg_col(df, "NCP", result_list_NCP)
df=add_partywise_agg_col(df, "CPI", result_list_CPI)
df=add_partywise_agg_col(df, "CPI(M)", result_list_CPIM)
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|       BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+---------

In [77]:
#Top donor for party:BJP
x=df.groupBy("Party").max("Amount").filter(df.Party=="BJP")
amt=x.collect()[0][1]
name=df.select("Name").where(df.Party=="BJP").where(df.Amount==amt)

bjp_top=name.collect()[0][0]
df=df.withColumn("BJP_TOP_DONOR", lit(bjp_top)).withColumn("BJP_TOP_DONOR", when(df.Party.like("BJP"), bjp_top).otherwise("N.A"))
df.show(5)


+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|BJP_TOP_DONOR|
+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----

In [78]:
#Top donor for party:CPI
x=df.groupBy("Party").max("Amount").filter(df.Party=="CPI")
amt=x.collect()[0][1]
name=df.select("Name").where(df.Party=="CPI").where(df.Amount==amt)
name.show()

cpi_top=name.collect()[0][0]
df=df.withColumn("CPI_TOP_DONOR", lit(cpi_top)).withColumn("CPI_TOP_DONOR", when(df.Party.like("CPI"), cpi_top).otherwise("N.A"))
df.show(5)

+----------+
|      Name|
+----------+
|Aziz Pasha|
+----------+

+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+-------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|BJP_TOP_DONOR|CPI_TOP_DONOR|
+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+------

In [79]:
#Top donor for party:INC
x=df.groupBy("Party").max("Amount").filter(df.Party=="INC")
amt=x.collect()[0][1]
name=df.select("Name").where(df.Party=="INC").where(df.Amount==amt)
name.show()

inc_top=name.collect()[0][0]
df=df.withColumn("INC_TOP_DONOR", lit(inc_top)).withColumn("INC_TOP_DONOR", when(df.Party.like("INC"), inc_top).otherwise("N.A"))
df.show(5)

+--------------------+
|                Name|
+--------------------+
|General Electoral...|
+--------------------+

+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+-------------+--------------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|BJP_TOP_DONOR|CPI_TOP_DONOR|       INC_TOP_DONOR|
+--------------------+----

In [80]:
#Top donor for party:CPI(M)
x=df.groupBy("Party").max("Amount").filter(df.Party=="CPI(M)")
amt=x.collect()[0][1]
name=df.select("Name").where(df.Party=="CPI(M)").where(df.Amount==amt)
name.show()

cpim_top=name.collect()[0][0]
df=df.withColumn("CPI(M)_TOP_DONOR", lit(cpim_top)).withColumn("CPI(M)_TOP_DONOR", when(df.Party.like("CPI(M)"), cpim_top).otherwise("N.A"))
df.show(5)

+----------------+
|            Name|
+----------------+
|V K Ramachandran|
+----------------+

+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+-------------+--------------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|BJP_TOP_DONOR|CPI_TOP_DONOR|       INC_TOP_DONOR|CPI(M)_TOP_DONOR|
+-----------

In [81]:
#Top donor for party:NCP
x=df.groupBy("Party").max("Amount").filter(df.Party=="NCP")
amt=x.collect()[0][1]
name=df.select("Name").where(df.Party=="NCP").where(df.Amount==amt)
name.show()

ncp_top=name.collect()[0][0]
df=df.withColumn("NCP_TOP_DONOR", lit(ncp_top)).withColumn("NCP_TOP_DONOR", when(df.Party.like("NCP"), ncp_top).otherwise("N.A"))
df.show(5)

+--------------------+
|                Name|
+--------------------+
|Lodha Dwellers Pv...|
|Satya Electoral T...|
+--------------------+

+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+-------------+--------------------+----------------+-------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|BJP_TOP_DONOR|CPI_TOP_

In [82]:
#SUM OF DONATION PER YEAR
x=df.groupBy('Party', 'fin_year').sum('Amount')
#x.show()
# amt=x.collect()[9][2]
# dff=df.withColumn("2011-12_BJP_SUM", lit(amt)).withColumn("2011-12_BJP_SUM", when(df.Party.like("BJP") & df.fin_year.like("2011-12"), amt).otherwise("N.A"))
# dff.show(150)

for i in df.groupBy('Party','fin_year').sum('Amount').collect():
    df=df.withColumn(i['fin_year']+"_"+i['Party']+"_SUM", lit(i['sum(Amount)'])).withColumn(i['fin_year']+"_"+i['Party']+"_SUM",when(df.fin_year.like(i['fin_year']) & df.Party.like(i['Party']), i['sum(Amount)']).otherwise("N.A"))

df.show(40)

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+--------------------+-------------+--------------------+----------------+-------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------

In [83]:
df10=df.groupBy("mode_of_payment").count()
df10.show()

l1=['Cash','Cheque','Others','Bank']

for i in range(4):

    df=df.withColumn(l1[i]+"_COUNT_LTD",lit(df10.collect()[i][1])).withColumn(l1[i]+"_COUNT_LTD", when(df.mode_of_payment.like(l1[i]),df10.collect()[i][1]).otherwise("0"))


df.show(150)


+---------------+-----+
|mode_of_payment|count|
+---------------+-----+
|           Bank| 6845|
|         Cheque| 2683|
|           Cash|   29|
|         Others| 4016|
+---------------+-----+

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+--------------------+-------------+--------------------+----------------+--------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+----

In [84]:
df.write.parquet("parquet_final")

Py4JJavaError: An error occurred while calling o5126.parquet.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:209)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:343)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:344)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:901)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 22 more
