In [244]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame and Sql") \
    .getOrCreate()

In [245]:
df = spark.read.format("csv").option("header", "true").load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")
df

DataFrame[Summons Number: string, Plate ID: string, Registration State: string, Issue Date: string, Violation Code: string, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: string, Issuer Precinct: string, Violation Time: string]

In [246]:
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|

In [247]:
df.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Violation Time: string (nullable = true)



In [248]:
print((df.count(), len(df.columns)))

(10803028, 10)


## Examine Data

In [249]:
from pyspark.sql.functions import expr, col, column

print(df.select(col('Summons Number')).distinct().count())

10803028


###  1. Total number of parking tickets :  10803028

In [250]:
df.select(col('Registration State')).distinct().show()
print(df.select(col('Registration State')).distinct().count())

+------------------+
|Registration State|
+------------------+
|                AZ|
|                SC|
|                NS|
|                LA|
|                MN|
|                NJ|
|                MX|
|                DC|
|                OR|
|                99|
|                NT|
|                VA|
|                RI|
|                KY|
|                WY|
|                BC|
|                NH|
|                MI|
|                GV|
|                NV|
+------------------+
only showing top 20 rows

67


### Registration State '99' seems to be erronous data.
### Need to replace by state having maximum entries..

In [251]:
df_grouped_RegistrationState = df.groupby('Registration State').agg({'Summons Number' : 'count'})

In [252]:
df_grouped_RegistrationState = df_grouped_RegistrationState.sort(['count(Summons Number)'], ascending = False)
df_grouped_RegistrationState.show()

+------------------+---------------------+
|Registration State|count(Summons Number)|
+------------------+---------------------+
|                NY|              8481061|
|                NJ|               925965|
|                PA|               285419|
|                FL|               144556|
|                CT|               141088|
|                MA|                85547|
|                IN|                80749|
|                VA|                72626|
|                MD|                61800|
|                NC|                55806|
|                IL|                37329|
|                GA|                36852|
|                99|                36625|
|                TX|                36516|
|                AZ|                26426|
|                OH|                25302|
|                CA|                24260|
|                SC|                21836|
|                ME|                21574|
|                MN|                18227|
+----------

### Replacing Registration State '99' by 'NY'(maximum in terms of tickets)

In [253]:
from pyspark.sql import functions as F

df = df.withColumn('Registration State',
    F.when(df['Registration State']=='99','NY').
    otherwise(df['Registration State']))

In [254]:
df_grouped_RegistrationState = df.groupby('Registration State').agg({'Summons Number' : 'count'})
df_grouped_RegistrationState = df_grouped_RegistrationState.sort(['count(Summons Number)'], ascending = False)
df_grouped_RegistrationState.show()

+------------------+---------------------+
|Registration State|count(Summons Number)|
+------------------+---------------------+
|                NY|              8517686|
|                NJ|               925965|
|                PA|               285419|
|                FL|               144556|
|                CT|               141088|
|                MA|                85547|
|                IN|                80749|
|                VA|                72626|
|                MD|                61800|
|                NC|                55806|
|                IL|                37329|
|                GA|                36852|
|                TX|                36516|
|                AZ|                26426|
|                OH|                25302|
|                CA|                24260|
|                SC|                21836|
|                ME|                21574|
|                MN|                18227|
|                OK|                18165|
+----------

### '99' is now replaced by 'NY'

In [255]:
print(df.select(col('Registration State')).distinct().count())

66


### 2. Total number of unique states from where the cars that got parking tickets came : 66

## --------------------------------------------------------------------------------------------------  ##

## Aggregation tasks

In [256]:
# 1. frequency of the top five violation codes.

df_grouped_ViolationCode = df.groupby("Violation Code").agg({'Summons Number' : 'count'})
df_grouped_ViolationCode.show(5)

+--------------+---------------------+
|Violation Code|count(Summons Number)|
+--------------+---------------------+
|             7|               516395|
|            51|                61389|
|            54|                   11|
|            15|                   22|
|            11|                 7127|
+--------------+---------------------+
only showing top 5 rows



In [257]:
print('Top 5 Violation code are :')
df_grouped_ViolationCode.sort(['count(Summons Number)'], ascending = False).show(5)

Top 5 Violation code are :
+--------------+---------------------+
|Violation Code|count(Summons Number)|
+--------------+---------------------+
|            21|              1528588|
|            36|              1400614|
|            38|              1062304|
|            14|               893498|
|            20|               618593|
+--------------+---------------------+
only showing top 5 rows



In [258]:
# 2(a) Top 5 vehicle body type that gets ticket.
print ('Top 5 vehicle body type that gets ticket are :')
df_grouped_VehicleBodyType = df.groupby("Vehicle Body Type").agg({'Summons Number' : 'count'})
df_grouped_VehicleBodyType.sort(['count(Summons Number)'], ascending = False).show(5)

Top 5 vehicle body type that gets ticket are :
+-----------------+---------------------+
|Vehicle Body Type|count(Summons Number)|
+-----------------+---------------------+
|             SUBN|              3719802|
|             4DSD|              3082020|
|              VAN|              1411970|
|             DELV|               687330|
|              SDN|               438191|
+-----------------+---------------------+
only showing top 5 rows



In [259]:
# 2(a) Top 5 Vehicle make, that gets ticket.
print ('Top 5 Vehicle make, that gets ticket are :')
df_grouped_vehiclemake = df.groupby("Vehicle Make").agg({'Summons Number' : 'count'})
df_grouped_vehiclemake.sort(['count(Summons Number)'], ascending = False).show(5)

Top 5 Vehicle make, that gets ticket are :
+------------+---------------------+
|Vehicle Make|count(Summons Number)|
+------------+---------------------+
|        FORD|              1280958|
|       TOYOT|              1211451|
|       HONDA|              1079238|
|       NISSA|               918590|
|       CHEVR|               714655|
+------------+---------------------+
only showing top 5 rows



In [260]:
# 3(a) Top 5 Violation Precinct in terms of tickets.
print ('Top 5 Violation Precinct in terms of tickets :')
df_grouped_ViolationPrecinct = df.groupby("Violation Precinct").agg({'Summons Number' : 'count'})
df_grouped_ViolationPrecinct.sort(['count(Summons Number)'], ascending = False).show(5)
print('Ignoring Violation Precinct 0 (Error), fetching top 6:')
df_grouped_ViolationPrecinct.sort(['count(Summons Number)'], ascending = False).show(6)

Top 5 Violation Precinct in terms of tickets :
+------------------+---------------------+
|Violation Precinct|count(Summons Number)|
+------------------+---------------------+
|                 0|              2072400|
|                19|               535671|
|                14|               352450|
|                 1|               331810|
|                18|               306920|
+------------------+---------------------+
only showing top 5 rows

Ignoring Violation Precinct 0 (Error), fetching top 6:
+------------------+---------------------+
|Violation Precinct|count(Summons Number)|
+------------------+---------------------+
|                 0|              2072400|
|                19|               535671|
|                14|               352450|
|                 1|               331810|
|                18|               306920|
|               114|               296514|
+------------------+---------------------+
only showing top 6 rows



In [261]:
# 3(b) Top 5 Issuer Precinct in terms of tickets.
print ('Top 5 Issuer Precinct in terms of tickets :')
df_grouped_IssuerPrecinct = df.groupby("Issuer Precinct").agg({'Summons Number' : 'count'})
df_grouped_IssuerPrecinct.sort(['count(Summons Number)'], ascending = False).show(5)
print('Ignoring Violation Precinct 0 (Error), fetching top 6:')
df_grouped_IssuerPrecinct.sort(['count(Summons Number)'], ascending = False).show(6)

Top 5 Issuer Precinct in terms of tickets :
+---------------+---------------------+
|Issuer Precinct|count(Summons Number)|
+---------------+---------------------+
|              0|              2388479|
|             19|               521513|
|             14|               344977|
|              1|               321170|
|             18|               296553|
+---------------+---------------------+
only showing top 5 rows

Ignoring Violation Precinct 0 (Error), fetching top 6:
+---------------+---------------------+
|Issuer Precinct|count(Summons Number)|
+---------------+---------------------+
|              0|              2388479|
|             19|               521513|
|             14|               344977|
|              1|               321170|
|             18|               296553|
|            114|               289950|
+---------------+---------------------+
only showing top 6 rows



In [262]:
# 4. violation code frequencies for three precincts that have issued the most number of tickets( 19, 14 , 1).

df_filtered_19 = df.filter((col('Violation Precinct').cast('Int') == 19))
df_filtered_19_grouped = df_filtered_19.groupby('Violation Code').agg({'Summons Number' : 'count'})

df_filtered_14 = df.filter((col('Violation Precinct').cast('Int') == 14))
df_filtered_14_grouped = df_filtered_14.groupby('Violation Code').agg({'Summons Number' : 'count'})

df_filtered_1 = df.filter((col('Violation Precinct').cast('Int') == 1))
df_filtered_1_grouped = df_filtered_1.groupby('Violation Code').agg({'Summons Number' : 'count'})

### For Precinct 19

In [263]:
print('Frequency of different voilation code for Violation Precinct 19 :')
df_filtered_19_grouped.sort(['count(Summons Number)'], ascending = False).show(5)

Frequency of different voilation code for Violation Precinct 19 :
+--------------+---------------------+
|Violation Code|count(Summons Number)|
+--------------+---------------------+
|            46|                90530|
|            38|                74926|
|            37|                73359|
|            14|                58640|
|            21|                56516|
+--------------+---------------------+
only showing top 5 rows



### For Precinct 14

In [264]:
print('Frequency of different voilation code for Violation Precinct 14 :')
df_filtered_14_grouped.sort(['count(Summons Number)'], ascending = False).show(5)

Frequency of different voilation code for Violation Precinct 14 :
+--------------+---------------------+
|Violation Code|count(Summons Number)|
+--------------+---------------------+
|            14|                75850|
|            69|                58032|
|            31|                40150|
|            47|                31201|
|            42|                20666|
+--------------+---------------------+
only showing top 5 rows



### For Precinct 1

In [265]:
print('Frequency of different voilation code for Violation Precinct 1 :')
df_filtered_1_grouped.sort(['count(Summons Number)'], ascending = False).show(5)

Frequency of different voilation code for Violation Precinct 1 :
+--------------+---------------------+
|Violation Code|count(Summons Number)|
+--------------+---------------------+
|            14|                76375|
|            16|                39197|
|            20|                28768|
|            46|                23954|
|            38|                17139|
+--------------+---------------------+
only showing top 5 rows



### Voilation code 14 seems to be common among all  3 precincts in terms of maximum frequency of tickets. 

In [266]:
# 5.Analysing properties of parking violations across different times of the day

from pyspark.sql.functions import isnan, when, count, col, isnull

In [267]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



In [268]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



### No Null Values

In [269]:
df.select(col('Violation Time')).show()

+--------------+
|Violation Time|
+--------------+
|         0143A|
|         0400P|
|         0233P|
|         1120A|
|         0555P|
|         0852P|
|         0215A|
|         0758A|
|         1005A|
|         0845A|
|         0015A|
|         0707A|
|         1022A|
|         1150A|
|         0525A|
|         0645P|
|         1122A|
|         0256P|
|         1232A|
|         1034A|
+--------------+
only showing top 20 rows



In [270]:
from pyspark.sql.functions import substring, length, expr, col
from pyspark.sql.functions import regexp_extract 

df = df.withColumn('AM/PM', substring(col("Violation Time"), -1, 1))
df.show()


+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|AM/PM|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|    A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|    P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|    P|
|    8478629828| 66623ME|                NY|2017-06-

In [271]:
df = df.withColumn("Violation Time",substring(df['Violation Time'],0, 4))
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|AM/PM|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|          0143|    A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|          0400|    P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|          0233|    P|
|    8478629828| 66623ME|                NY|2017-06-

In [272]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("Violation Time",to_timestamp(df['Violation Time'], 'hhmm'))
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+-------------------+-----+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|     Violation Time|AM/PM|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+-------------------+-----+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|1970-01-01 01:43:00|    A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|1970-01-01 04:00:00|    P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|1970-01-01 02:33:00|    P|
|    8478629828| 66623

In [273]:
from pyspark.sql import functions as F

df = df.withColumn("Interval", 
   F.when((df['Violation Time'].between('1970-01-01 04:00:00', '1970-01-01 08:00:00')) & (df['AM/PM'] == 'A'), 'Early Morning')
    .when((df['Violation Time'].between('1970-01-01 08:00:00', '1970-01-01 00:00:00')) & (df['AM/PM'] == 'A'), 'Morning')
    .when((df['Violation Time'].between('1970-01-01 00:00:00', '1970-01-01 04:00:00')) & (df['AM/PM'] == 'P'), 'Noon')
    .when((df['Violation Time'].between('1970-01-01 04:00:00', '1970-01-01 08:00:00')) & (df['AM/PM'] == 'P'), 'Evening')
    .when((df['Violation Time'].between('1970-01-01 08:00:00', '1970-01-01 00:00:00')) & (df['AM/PM'] == 'P'), 'Night')
    .otherwise('Late Night'))
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+-------------------+-----+-------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|     Violation Time|AM/PM|     Interval|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+-------------------+-----+-------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|1970-01-01 01:43:00|    A|   Late Night|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|1970-01-01 04:00:00|    P|         Noon|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|              

In [275]:
df_grouped = df.groupby('Interval').agg({'Summons Number' : 'count'})
df_grouped.show()

Py4JJavaError: An error occurred while calling o3867.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(Interval#4998, 200)
+- *(1) HashAggregate(keys=[Interval#4998], functions=[partial_count(Summons Number#4245)], output=[Interval#4998, count#5110L])
   +- *(1) Project [Summons Number#4245, CASE WHEN (((cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) >= 1970-01-01 04:00:00) && (cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) <= 1970-01-01 08:00:00)) && (substring(Violation Time#4254, -1, 1) = A)) THEN Early Morning WHEN (((cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) >= 1970-01-01 08:00:00) && (cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) <= 1970-01-01 00:00:00)) && (substring(Violation Time#4254, -1, 1) = A)) THEN Morning WHEN (((cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) >= 1970-01-01 00:00:00) && (cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) <= 1970-01-01 04:00:00)) && (substring(Violation Time#4254, -1, 1) = P)) THEN Noon WHEN (((cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) >= 1970-01-01 04:00:00) && (cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) <= 1970-01-01 08:00:00)) && (substring(Violation Time#4254, -1, 1) = P)) THEN Evening WHEN (((cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) >= 1970-01-01 08:00:00) && (cast(cast(unix_timestamp(substring(Violation Time#4254, 0, 4), hhmm, Some(UTC)) as timestamp) as string) <= 1970-01-01 00:00:00)) && (substring(Violation Time#4254, -1, 1) = P)) THEN Night ELSE Late Night END AS Interval#4998]
      +- *(1) FileScan csv [Summons Number#4245,Violation Time#4254] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://nameservice1/common_folder/nyc_parking/Parking_Violations_Issued_-_Fisca..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Summons Number:string,Violation Time:string>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:374)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:610)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
	at sun.reflect.GeneratedMethodAccessor75.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1486)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:610)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 41 more
