In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
import time as t
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [3]:
spark

## Reading Data with and without defining Scheme

### Exec time without defining Scheme

In [4]:
t0 = t.time()
firedep = spark.read.csv('/Users/furqan/Python/Spark_Temp/Fire_Department_Calls_for_Service.csv', 
                         header=True, inferSchema=True)
t1=t.time()
print('Exec time is:', t1-t0)

Exec time is: 10.102001905441284


## Get Column Names

In [5]:
firedep.columns

['Call Number',
 'Unit ID',
 'Incident Number',
 'Call Type',
 'Call Date',
 'Watch Date',
 'Received DtTm',
 'Entry DtTm',
 'Dispatch DtTm',
 'Response DtTm',
 'On Scene DtTm',
 'Transport DtTm',
 'Hospital DtTm',
 'Call Final Disposition',
 'Available DtTm',
 'Address',
 'City',
 'Zipcode of Incident',
 'Battalion',
 'Station Area',
 'Box',
 'Original Priority',
 'Priority',
 'Final Priority',
 'ALS Unit',
 'Call Type Group',
 'Number of Alarms',
 'Unit Type',
 'Unit sequence in call dispatch',
 'Fire Prevention District',
 'Supervisor District',
 'Neighborhooods - Analysis Boundaries',
 'Location',
 'RowID']

In [6]:
fireSchema = StructType([
    StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('ReceivedDtTm', StringType(), True),
    StructField('EntryDtTm', StringType(), True),
    StructField('DispatchDtTm', StringType(), True),
    StructField('ResponseDtTm', StringType(), True),
    StructField('OnSceneDtTm', StringType(), True),
    StructField('TransportDtTm', StringType(), True),
    StructField('HospitalDtTm', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('ZipcodeOfIncident',IntegerType(), True),
    StructField('Battalion',StringType(), True),
    StructField('StationArea',StringType(), True),
    StructField('Box',StringType(), True),
    StructField('OriginalPriority',StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumberOfAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('NeighborhooodsDistrict', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True)
])

### Exec time after defining Schema

In [7]:
t0 = t.time()
firedep = spark.read.csv('/Users/furqan/Python/Spark_Temp/Fire_Department_Calls_for_Service.csv', 
                         header=True, schema=fireSchema)
t1=t.time()
print('Exec time is:', t1-t0)

Exec time is: 0.07374310493469238


In [8]:
'''
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=show#pyspark.sql.DataFrame.show
truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, 
truncates long strings to length truncate and align cells right.
'''
firedep.select(['CallNumber', 'UnitID' ]).show(1, truncate=False)

+----------+------+
|CallNumber|UnitID|
+----------+------+
|192910017 |E11   |
+----------+------+
only showing top 1 row



## Convert Spark DF to Pandas DF

In [9]:
firedep.limit(20).toPandas()

Unnamed: 0,CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,ReceivedDtTm,EntryDtTm,DispatchDtTm,ResponseDtTm,...,ALSUnit,CallTypeGroup,NumberOfAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhooodsDistrict,Location,RowID
0,192910017,E11,19125164,Alarms,10/18/2019,10/17/2019,10/18/2019 12:03:52 AM,10/18/2019 12:06:59 AM,10/18/2019 12:07:05 AM,10/18/2019 12:08:28 AM,...,True,Alarm,1,ENGINE,1,6,9,Mission,"(37.75210364574824, -122.42066480228367)",192910017-E11
1,192910018,B10,19125165,Alarms,10/18/2019,10/17/2019,10/18/2019 12:05:56 AM,10/18/2019 12:07:27 AM,10/18/2019 12:09:49 AM,,...,False,Alarm,1,CHIEF,1,6,9,Mission,"(37.75368162954947, -122.4202535645237)",192910018-B10
2,192910018,T07,19125165,Alarms,10/18/2019,10/17/2019,10/18/2019 12:05:56 AM,10/18/2019 12:07:27 AM,10/18/2019 12:09:49 AM,,...,False,Alarm,1,TRUCK,3,6,9,Mission,"(37.75368162954947, -122.4202535645237)",192910018-T07
3,192910025,B04,19125166,Alarms,10/18/2019,10/17/2019,10/18/2019 12:09:02 AM,10/18/2019 12:09:02 AM,10/18/2019 12:09:02 AM,10/18/2019 12:09:02 AM,...,False,Alarm,1,CHIEF,1,4,2,Marina,"(37.80034056356869, -122.43607739030332)",192910025-B04
4,192910034,E01,19125167,Structure Fire,10/18/2019,10/17/2019,10/18/2019 12:12:39 AM,10/18/2019 12:12:39 AM,10/18/2019 12:12:48 AM,10/18/2019 12:13:52 AM,...,True,Alarm,1,ENGINE,1,2,6,South of Market,"(37.779211684542084, -122.41093657380038)",192910034-E01
5,192910034,T01,19125167,Structure Fire,10/18/2019,10/17/2019,10/18/2019 12:12:39 AM,10/18/2019 12:12:39 AM,10/18/2019 12:12:48 AM,10/18/2019 12:14:28 AM,...,False,Alarm,1,TRUCK,2,2,6,South of Market,"(37.779211684542084, -122.41093657380038)",192910034-T01
6,192910039,76,19125168,Medical Incident,10/18/2019,10/17/2019,10/18/2019 12:14:32 AM,10/18/2019 12:14:32 AM,10/18/2019 12:15:10 AM,10/18/2019 12:15:25 AM,...,True,Non Life-threatening,1,MEDIC,1,10,10,Bayview Hunters Point,"(37.73607882495912, -122.38972310330021)",192910039-76
7,192910048,T08,19125169,Alarms,10/18/2019,10/17/2019,10/18/2019 12:20:25 AM,10/18/2019 12:21:44 AM,10/18/2019 12:21:51 AM,10/18/2019 12:24:30 AM,...,False,Alarm,1,TRUCK,3,3,6,Mission Bay,"(37.77663138541027, -122.3921894505535)",192910048-T08
8,192910057,78,19125170,Medical Incident,10/18/2019,10/17/2019,10/18/2019 12:23:58 AM,10/18/2019 12:26:34 AM,10/18/2019 12:26:42 AM,10/18/2019 12:27:20 AM,...,True,Potentially Life-Threatening,1,MEDIC,2,9,7,West of Twin Peaks,"(37.736080487699894, -122.44882464734825)",192910057-78
9,192910057,E15,19125170,Medical Incident,10/18/2019,10/17/2019,10/18/2019 12:23:58 AM,10/18/2019 12:26:34 AM,10/18/2019 12:26:42 AM,10/18/2019 12:29:33 AM,...,True,Potentially Life-Threatening,1,ENGINE,1,9,7,West of Twin Peaks,"(37.736080487699894, -122.44882464734825)",192910057-E15


## Get table Schema

In [10]:
firedep.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeOfIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPr

## Describe DataFrame

In [11]:
firedep.select('CallNumber', 'NumberOfAlarms').describe().show()

+-------+--------------------+-------------------+
|summary|          CallNumber|     NumberOfAlarms|
+-------+--------------------+-------------------+
|  count|             5144933|            5144933|
|   mean|1.0452980439042258E8| 1.0054086224252095|
| stddev| 5.694029816593383E7|0.09963311637299194|
|    min|             1030101|                  1|
|    max|           193414308|                  5|
+-------+--------------------+-------------------+



In [12]:
#%%time
#firedep.describe()

### Filter DataFrame to get Column or DataFrame object

#### Column Object

In [13]:
type(firedep['CallNumber'])

pyspark.sql.column.Column

#### DataFrame Object

In [14]:
type(firedep.select('CallNumber'))

pyspark.sql.dataframe.DataFrame

#### Get Multiple Columns

In [15]:
firedep['CallNumber', 'NumberOfAlarms']

DataFrame[CallNumber: int, NumberOfAlarms: int]

## Get DF Rows

In [16]:
firedep['CallNumber', 'NumberOfAlarms'].head(5)[0]

Row(CallNumber=192910017, NumberOfAlarms=1)

## Drop NA Values

In [17]:
null_df = spark.read.csv('NullData.csv', header=True)

In [18]:
null_df.na.drop().show()

+----+-----+-----+------+
|Acct|month|Debit|Credit|
+----+-----+-----+------+
|   A|    1|  100|   200|
|   A|    2|  200|   200|
|   A|    3|  300|    10|
|   B|    1|   10|   200|
|   C|    1| 1000|   100|
+----+-----+-----+------+



### Drop NA with threshold

In [19]:
# Each row must contain atleast 2 non null values to be not dropped
null_df.na.drop(thresh =2).show()

+----+-----+-----+------+
|Acct|month|Debit|Credit|
+----+-----+-----+------+
|   A|    1|  100|   200|
|   A|    2|  200|   200|
|   A|    3|  300|    10|
|   B|    1|   10|   200|
|   B|    3|   20|  null|
|   C|    1| 1000|   100|
|   C|    2|   10|  null|
|   C|    3| null|  null|
+----+-----+-----+------+



In [20]:
# define subset to drop
null_df.na.drop(subset = ['Debit']).show()

+----+-----+-----+------+
|Acct|month|Debit|Credit|
+----+-----+-----+------+
|   A|    1|  100|   200|
|   A|    2|  200|   200|
|   A|    3|  300|    10|
|   B|    1|   10|   200|
|   B|    3|   20|  null|
|   C|    1| 1000|   100|
|   C|    2|   10|  null|
+----+-----+-----+------+



In [21]:
converted_null_df = null_df.withColumn('Debit', null_df['Debit'].cast(IntegerType()))

In [22]:
converted_null_df = converted_null_df.withColumn('Credit', converted_null_df['Credit'].cast(IntegerType()))

## Pivot Data

In [23]:
pvt_df = converted_null_df.groupby('Acct').pivot('Month').sum('Debit')

In [24]:
pvt_df.show()

+----+----+----+----+
|Acct|   1|   2|   3|
+----+----+----+----+
|   B|  10|null|  20|
|   C|1000|  10|null|
|   A| 100| 200| 300|
+----+----+----+----+



In [25]:
pivoted_data = converted_null_df.groupby('Acct').pivot('Month').sum('Debit', 'Credit')

In [35]:
pivoted_data.printSchema()

root
 |-- Acct: string (nullable = true)
 |-- 1_sum(CAST(Debit AS BIGINT)): long (nullable = true)
 |-- 1_sum(CAST(Credit AS BIGINT)): long (nullable = true)
 |-- 2_sum(CAST(Debit AS BIGINT)): long (nullable = true)
 |-- 2_sum(CAST(Credit AS BIGINT)): long (nullable = true)
 |-- 3_sum(CAST(Debit AS BIGINT)): long (nullable = true)
 |-- 3_sum(CAST(Credit AS BIGINT)): long (nullable = true)



## Column Renaming

In [27]:
##Renaming columns in above df
#https://sparkbyexamples.com/spark/rename-a-column-on-spark-dataframes/
renamed_conv_null_df = pivoted_data.withColumnRenamed('1_sum(CAST(Debit AS BIGINT))', 'M1D') \
                      .withColumnRenamed('1_sum(CAST(Credit AS BIGINT))', 'M1C') \
                      .withColumnRenamed('2_sum(CAST(Debit AS BIGINT))', 'M2D') \
                      .withColumnRenamed('2_sum(CAST(Credit AS BIGINT))', 'M2C') \
                      .withColumnRenamed('3_sum(CAST(Debit AS BIGINT))', 'M3D') \
                      .withColumnRenamed('3_sum(CAST(Credit AS BIGINT))', 'M3C') 

In [28]:
renamed_conv_null_df.show()

+----+----+---+----+----+----+----+
|Acct| M1D|M1C| M2D| M2C| M3D| M3C|
+----+----+---+----+----+----+----+
|   B|  10|200|null|null|  20|null|
|   C|1000|100|  10|null|null|null|
|   A| 100|200| 200| 200| 300|  10|
+----+----+---+----+----+----+----+



## Create new column and fillna

In [29]:
renamed_conv_null_df = renamed_conv_null_df.fillna(0, subset=['M2C']) \
                       .fillna(1, subset=['M3C']) \
                       .withColumn('Ratio', col('M2C') / col('M3C'))

In [30]:
renamed_conv_null_df.show()

+----+----+---+----+---+----+---+-----+
|Acct| M1D|M1C| M2D|M2C| M3D|M3C|Ratio|
+----+----+---+----+---+----+---+-----+
|   B|  10|200|null|  0|  20|  1|  0.0|
|   C|1000|100|  10|  0|null|  1|  0.0|
|   A| 100|200| 200|200| 300| 10| 20.0|
+----+----+---+----+---+----+---+-----+



## Create new column and how to fillna (not inplace)

In [31]:
renamed_conv_null_df = renamed_conv_null_df.withColumn('Ratio', coalesce(col('M2D'),
                                                                         lit(0)) / coalesce(col('M3D'), lit(1)))

In [32]:
renamed_conv_null_df.show()

+----+----+---+----+---+----+---+------------------+
|Acct| M1D|M1C| M2D|M2C| M3D|M3C|             Ratio|
+----+----+---+----+---+----+---+------------------+
|   B|  10|200|null|  0|  20|  1|               0.0|
|   C|1000|100|  10|  0|null|  1|              10.0|
|   A| 100|200| 200|200| 300| 10|0.6666666666666666|
+----+----+---+----+---+----+---+------------------+

