In [34]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
        .appName("mysparksession")
        .getOrCreate()
         )
        

In [35]:
from pyspark.sql.types import *

In [36]:
schema = StructType([StructField('CallNumber', IntegerType(), True),
                    StructField('UnitID', StringType(), True),
                    StructField('IncidentNumber', IntegerType(), True),
                    StructField('CallType', StringType(), True),
                    StructField('CallDate', StringType(), True),
                    StructField('WatchDate', StringType(), True),
                    StructField('CallFinalDisposition', StringType(), True),
                    StructField('AvailableDtTm', StringType(), True),
                    StructField('Address', StringType(), True),
                    StructField('City', StringType(), True),
                    StructField('Zipcode', IntegerType(), True),
                    StructField('Battalion', StringType(), True),
                    StructField('StationArea', StringType(), True),
                    StructField('Box', StringType(), True),
                    StructField('OriginalPriority', StringType(), True),
                    StructField('Priority', StringType(), True),
                    StructField('FinalPriority', IntegerType(), True),
                    StructField('ALSUnit', BooleanType(), True),
                    StructField('CallTypeGroup', StringType(), True),
                    StructField('NumAlarms', IntegerType(), True),
                    StructField('UnitType', StringType(), True),
                    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                    StructField('FirePreventionDistrict', StringType(), True),
                    StructField('SupervisorDistrict', StringType(), True),
                    StructField('Neighborhood', StringType(), True),
                    StructField('Location', StringType(), True),
                    StructField('RowID', StringType(), True),
                    StructField('Delay', FloatType(), True)])

In [37]:
data_file = '/Users/saurabhverma/LearningSparkV2/chapter3/data/sf-fire-calls.csv'

fire_df = spark.read.csv(data_file, header=True, schema=schema) 

In [38]:
fire_df.show(10)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [39]:
path_to_save='/Users/saurabhverma/Spark-Training/Chapter 3/fire_sf_data'

fire_df.write.format("parquet").save(path_to_save)

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/Users/saurabhverma/Spark-Training/Chapter 3/fire_sf_data already exists. Set mode as "overwrite" to overwrite the existing path.

In [None]:
path_to_save='/Users/saurabhverma/Spark-Training/Chapter 3/fire_sf_data_csv'

fire_df.write.format("csv").save(path_to_save)

                                                                                

In [None]:
path_to_save='/Users/saurabhverma/Spark-Training/Chapter 3/fire_sf_data_csv_1'

fire_df.coalesce(1).write.format("csv").save(path_to_save)

                                                                                

Analysing the data

In [None]:
fire_df.show(10)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [None]:
#filtering the data based on some criteria

In [None]:
from pyspark.sql.functions import col
few_fire_df = (fire_df
               .select('IncidentNumber', 'AvailableDtTm', 'CallType')
               .where(col('CallType') != 'Alarms')
               )

few_fire_df.show(10, truncate=False)

+--------------+----------------------+----------------+
|IncidentNumber|AvailableDtTm         |CallType        |
+--------------+----------------------+----------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire  |
|2003241       |01/11/2002 03:01:18 AM|Medical Incident|
|2003242       |01/11/2002 02:39:50 AM|Medical Incident|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire    |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire  |
|2003343       |01/11/2002 12:06:57 PM|Medical Incident|
|2003348       |01/11/2002 01:08:40 PM|Medical Incident|
|2003381       |01/11/2002 03:31:02 PM|Medical Incident|
|2003382       |01/11/2002 02:59:04 PM|Structure Fire  |
|2003399       |01/11/2002 04:22:49 PM|Medical Incident|
+--------------+----------------------+----------------+
only showing top 10 rows



In [None]:
#finding the types of call types and count of incidents for them

In [None]:
from pyspark.sql.functions import countDistinct

grouped_call_type = (fire_df
                     .select('CallNumber','IncidentNumber','CallType')
                     .groupBy('CallType')
                     .agg(countDistinct(col('IncidentNumber')).alias('IncidentNumberCount')
                          ,countDistinct('CallNumber').alias( 'CallNumberCount' )
                     )
                     .orderBy('IncidentNumberCount', ascending= False)
                     )

In [None]:
grouped_call_type.show(10)

+--------------------+-------------------+---------------+
|            CallType|IncidentNumberCount|CallNumberCount|
+--------------------+-------------------+---------------+
|    Medical Incident|             111097|         111097|
|      Structure Fire|              20724|          20724|
|              Alarms|              18685|          18685|
|   Traffic Collision|               6722|           6722|
|Citizen Assist / ...|               2503|           2503|
|               Other|               2113|           2113|
|        Outside Fire|               2034|           2034|
|        Vehicle Fire|                821|            821|
|Gas Leak (Natural...|                735|            735|
|        Water Rescue|                621|            621|
+--------------------+-------------------+---------------+
only showing top 10 rows



In [None]:
#Rename a column

new_fire_df = fire_df.withColumnRenamed('Delay', 'DelayinMinutes')

new_fire_df[['DelayinMinutes']].show(10)

+--------------+
|DelayinMinutes|
+--------------+
|          2.95|
|           4.7|
|     2.4333334|
|           1.5|
|     3.4833333|
|          1.75|
|     2.7166667|
|     1.7833333|
|     1.5166667|
|     2.7666667|
+--------------+
only showing top 10 rows



In [None]:
#Converting string time to timestamp

In [40]:
from pyspark.sql.functions import to_timestamp, col

fire_df_tramsformed = (fire_df
                       .withColumn('IncidentDate',to_timestamp(col('CallDate'),'MM/dd/yyyy' ))
                       .drop('CallDate')
                       .withColumn('OnWatchDate',to_timestamp(col('WatchDate'),'MM/dd/yyyy' ))
                       .drop('WatchDate')
                       .withColumn('AvailableDtTS',to_timestamp(col('AvailableDtTm'),'MM/dd/yyyy hh:mm:ss a' ))
                       .drop('AvailableDtTm')
                       

                       )

In [41]:
fire_df_tramsformed.show(5)

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------

In [33]:
# DataFrame Methods
# 1. Data Selection and Transformation

# select(*cols): Selects specific columns.
# selectExpr(*exprs): Selects columns using SQL expressions.
# withColumn(colName, col): Adds a new column or replaces an existing column with the same name.
# withColumnRenamed(existing, new): Renames an existing column.
# drop(*cols): Drops specified columns from the DataFrame.
# **filter(condition) or where(condition): Filters rows based on a condition.
# distinct(): Removes duplicate rows.
# 2. Aggregation and Grouping

# groupBy(*cols): Groups rows by one or more columns.
# agg(*exprs): Aggregates data using one or more aggregation functions.
# count(): Counts the number of rows in the DataFrame.
# sum(*cols): Computes the sum of numeric columns.
# avg(*cols): Computes the average of numeric columns.
# min(*cols): Computes the minimum value of columns.
# max(*cols): Computes the maximum value of columns.
# pivot(pivot_col): Pivots a DataFrame around a specified column.
# 3. Joins and Merges

# join(other, on=None, how=None): Joins with another DataFrame.
# crossJoin(other): Performs a cross join with another DataFrame.
# 4. Data Inspection and Exploration

# show(n=20, truncate=True): Displays the top n rows of the DataFrame.
# printSchema(): Prints the schema of the DataFrame.
# describe(*cols): Computes statistics for numeric columns.
# head(n=1): Returns the first n rows as a list of Row objects.
# first(): Returns the first row as a Row object.
# 5. Data Repartitioning and Sampling

# repartition(numPartitions, *cols): Repartitions the DataFrame into a specified number of partitions.
# coalesce(numPartitions): Reduces the number of partitions in the DataFrame.
# sample(withReplacement, fraction, seed=None): Samples a fraction of the DataFrame.
# 6. Ordering and Sorting

# orderBy(*cols, ascending=True): Sorts the DataFrame by one or more columns.
# sort(*cols, ascending=True): Alias for orderBy.
# 7. Data Conversion and Export

# toPandas(): Converts the DataFrame to a Pandas DataFrame.
# write.format(source): Writes the DataFrame to an external data source (e.g., CSV, Parquet).
# 8. Schema and Metadata

# schema(): Returns the schema of the DataFrame.
# columns: Returns a list of column names.
# 9. UDF and SQL Operations

# registerTempTable(name): Registers a DataFrame as a temporary table for SQL queries.
# createOrReplaceTempView(name): Creates or replaces a temporary view using the DataFrame.
# sql(query): Executes SQL queries against a temporary view or table.
# 10. DataFrame Creation and Manipulation

# alias(alias): Gives a DataFrame an alias.
# cache(): Caches the DataFrame in memory.
# unpersist(): Removes the DataFrame from cache.
# fillna(value, subset=None): Replaces null values with the specified value.

In [43]:
# https://www.youtube.com/watch?v=71ntq5LImRc

24/08/14 02:50:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1039221 ms exceeds timeout 120000 ms
24/08/14 02:50:00 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/14 03:05:33 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at 