In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

In [2]:
# Create a DataFrame using SparkSession
spark = (SparkSession
         .builder.appName("AuthorsAges").getOrCreate())
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), 
  ("TD", 35), ("Brooke", 25)], ["name", "age"])
data_df.show()
avg_age = data_df.groupBy("name").agg(avg("age"))
avg_age.show()

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
|    TD| 35|
|Brooke| 25|
+------+---+

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Jules|    30.0|
|    TD|    35.0|
| Denny|    31.0|
+------+--------+



In [3]:
# Create schema 
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False),
  StructField("title", StringType(), False),
  StructField("pages", IntegerType(), False)])
# In Python 
from pyspark.sql import SparkSession

# Define schema for our data using DDL 
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

# Create our static data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter",
"LinkedIn"]],
       [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
"LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
"twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, 
["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
"twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, 
["twitter", "LinkedIn"]]
      ]

# Main program
if __name__ == "__main__":
   # Create a SparkSession
   spark = (SparkSession
     .builder
     .appName("Example-3_6")
     .getOrCreate())
   # Create a DataFrame using the schema defined above
   blogs_df = spark.createDataFrame(data, schema)
   # Show the DataFrame; it should reflect our table above
   blogs_df.show()
   # Print the schema used by Spark to process the DataFrame
   print(blogs_df.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (

In [4]:
from pyspark.sql import Row
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", 
  ["twitter", "LinkedIn"])
blog_row[1]

'Reynold'

In [5]:
# Row objects can be used to create DataFrames if you need them for quick interactivity and exploration:
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



In [6]:
# Create a DataFrame using SparkSession
# spark = (SparkSession
#          .builder.appName("AuthorsAges").getOrCreate())
import pandas as pd

# sf_data = spark.read.csv("Data/sf-fire-calls.csv")


In [7]:
from pyspark.sql.types import *

# Programmatic way to define a schema 
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                StructField('UnitID', StringType(), True),
                StructField('IncidentNumber', IntegerType(), True),
                StructField('CallType', StringType(), True),                  
                StructField('CallDate', StringType(), True),      
                StructField('WatchDate', StringType(), True),
                StructField('CallFinalDisposition', StringType(), True),
                StructField('AvailableDtTm', StringType(), True),
                StructField('Address', StringType(), True),       
                StructField('City', StringType(), True),       
                StructField('Zipcode', IntegerType(), True),       
                StructField('Battalion', StringType(), True),                 
                StructField('StationArea', StringType(), True),       
                StructField('Box', StringType(), True),       
                StructField('OriginalPriority', StringType(), True),       
                StructField('Priority', StringType(), True),       
                StructField('FinalPriority', IntegerType(), True),       
                StructField('ALSUnit', BooleanType(), True),       
                StructField('CallTypeGroup', StringType(), True),
                StructField('NumAlarms', IntegerType(), True),
                StructField('UnitType', StringType(), True),
                StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                StructField('FirePreventionDistrict', StringType(), True),
                StructField('SupervisorDistrict', StringType(), True),
                StructField('Neighborhood', StringType(), True),
                StructField('Location', StringType(), True),
                StructField('RowID', StringType(), True),
                StructField('Delay', FloatType(), True)])

# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "Data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
df = fire_df.select("*").toPandas()
df


Unnamed: 0,CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,...,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay
0,20110016,T13,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:44 AM,2000 Block of CALIFORNIA ST,SF,...,,1,TRUCK,2.0,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-T13,2.950000
1,20110022,M17,2003241,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 03:01:18 AM,0 Block of SILVERVIEW DR,SF,...,,1,MEDIC,1.0,10,10,Bayview Hunters Point,"(37.7337623673897, -122.396113802632)",020110022-M17,4.700000
2,20110023,M41,2003242,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 02:39:50 AM,MARKET ST/MCALLISTER ST,SF,...,,1,MEDIC,2.0,3,6,Tenderloin,"(37.7811772186856, -122.411699931232)",020110023-M41,2.433333
3,20110032,E11,2003250,Vehicle Fire,01/11/2002,01/10/2002,Other,01/11/2002 04:16:46 AM,APPLETON AV/MISSION ST,SF,...,,1,ENGINE,1.0,6,9,Bernal Heights,"(37.7388432849018, -122.423948785199)",020110032-E11,1.500000
4,20110043,B04,2003259,Alarms,01/11/2002,01/10/2002,Other,01/11/2002 06:01:58 AM,1400 Block of SUTTER ST,SF,...,,1,CHIEF,2.0,4,2,Western Addition,"(37.7872890372638, -122.424236212664)",020110043-B04,3.483333
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
175291,183034235,T08,18127270,Structure Fire,10/30/2018,10/30/2018,Fire,10/30/2018 10:04:27 PM,700 Block of LONG BRIDGE ST,San Francisco,...,Alarm,1,TRUCK,3.0,3,6,Mission Bay,"(37.77139049567131, -122.39416141919062)",183034235-T08,2.483333
175292,183034238,86,18127271,Medical Incident,10/30/2018,10/30/2018,Code 2 Transport,10/30/2018 11:18:37 PM,300 Block of WILLIAMS AVE,San Francisco,...,Potentially Life-Threatening,1,MEDIC,2.0,10,10,Bayview Hunters Point,"(37.73017420329081, -122.39943089033613)",183034238-86,1.950000
175293,183034268,E08,18127272,Medical Incident,10/30/2018,10/30/2018,Code 2 Transport,10/30/2018 10:26:01 PM,1100 Block of 4TH ST,San Francisco,...,Potentially Life-Threatening,1,ENGINE,1.0,3,6,Mission Bay,"(37.77389059449051, -122.3915765620042)",183034268-E08,4.416667
175294,183034485,KM03,18127291,Medical Incident,10/30/2018,10/30/2018,Code 2 Transport,10/31/2018 12:52:43 AM,800 Block of HAIGHT ST,San Francisco,...,Non Life-threatening,1,PRIVATE,1.0,5,5,Haight Ashbury,"(37.77131921762313, -122.43619838315635)",183034485-KM03,3.566667


In [8]:
# Parquet is an open source file format available to any project in the Hadoop ecosystem. 
# Apache Parquet is designed for efficient as well as performant flat columnar storage format of data 
# compared to row based files like CSV or TSV files.

parquet_path = fire_df.write.format("parquet").save(parquet_path)

NameError: name 'parquet_path' is not defined

In [11]:
few_fire_df = (fire_df
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .filter("CallType == 'Medical Incident'")).toPandas()
few_fire_df

Unnamed: 0,IncidentNumber,AvailableDtTm,CallType
0,2003241,01/11/2002 03:01:18 AM,Medical Incident
1,2003242,01/11/2002 02:39:50 AM,Medical Incident
2,2003343,01/11/2002 12:06:57 PM,Medical Incident
3,2003348,01/11/2002 01:08:40 PM,Medical Incident
4,2003381,01/11/2002 03:31:02 PM,Medical Incident
...,...,...,...
113789,18127265,10/30/2018 10:57:36 PM,Medical Incident
113790,18127271,10/30/2018 11:18:37 PM,Medical Incident
113791,18127272,10/30/2018 10:26:01 PM,Medical Incident
113792,18127291,10/31/2018 12:52:43 AM,Medical Incident


In [12]:
# Projections and filters
from pyspark.sql.functions import *
(
    fire_df
    .select("CallType")
    .where(col("CallType").isNotNull())
    .agg(countDistinct("CallType").alias("DistinctCallTypes"))
    .show()
)
(
    fire_df
    .select("CallType")
    .where(col("CallType").isNotNull())
    .show()
)

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+

+----------------+
|        CallType|
+----------------+
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|    Vehicle Fire|
|          Alarms|
|  Structure Fire|
|          Alarms|
|          Alarms|
|Medical Incident|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|  Structure Fire|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|Medical Incident|
+----------------+
only showing top 20 rows



In [13]:
# Renaming, adding, and dropping columns
# Renaming
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
 .select("ResponseDelayedinMins")
 .where(col("ResponseDelayedinMins") > 5).show(5,False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [14]:
#Retype
fire_ts_df = (
    new_fire_df
    .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
    .drop("callDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
    .drop("OnWatchDate")
    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
    .drop("AvailableDtTS")
)

In [15]:
(fire_ts_df
 .select("IncidentDate", "WatchDate", "AvailableDtTm").show(5, False)
)
print("-----------------------------------------------------------")
(
    fire_ts_df
    .select(year("IncidentDate"))
    .distinct()
    .orderBy(year("IncidentDate")).show()
)
print('-------------------------------------------------------------')
(new_fire_df.select("CallDate").show(5,False))

+-------------------+----------+----------------------+
|IncidentDate       |WatchDate |AvailableDtTm         |
+-------------------+----------+----------------------+
|2002-01-11 00:00:00|01/10/2002|01/11/2002 01:51:44 AM|
|2002-01-11 00:00:00|01/10/2002|01/11/2002 03:01:18 AM|
|2002-01-11 00:00:00|01/10/2002|01/11/2002 02:39:50 AM|
|2002-01-11 00:00:00|01/10/2002|01/11/2002 04:16:46 AM|
|2002-01-11 00:00:00|01/10/2002|01/11/2002 06:01:58 AM|
+-------------------+----------+----------------------+
only showing top 5 rows

-----------------------------------------------------------
+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|           

In [16]:
# Aggregations
(
    fire_ts_df
    .select("CallType")
    .where(col("CallType").isNotNull())
    .groupBy("CallType")
    .count()
    .orderBy("count", ascending = True)
    .show(n = 10, truncate = False)
)

+--------------------------------------------+-----+
|CallType                                    |count|
+--------------------------------------------+-----+
|Administrative                              |3    |
|Mutual Aid / Assist Outside Agency          |9    |
|Confined Space / Structure Collapse         |13   |
|Marine Fire                                 |14   |
|Suspicious Package                          |15   |
|Oil Spill                                   |21   |
|Extrication / Entrapped (Machinery, Vehicle)|28   |
|Watercraft in Distress                      |28   |
|High Angle Rescue                           |32   |
|Assist Police                               |35   |
+--------------------------------------------+-----+
only showing top 10 rows



In [17]:
import pyspark.sql.functions as F
(
    fire_ts_df
    .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"), 
            F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins")).show()
)
# For more advanced statistical needs common with data science workloads, 
# read the API documentation for methods like stat(), describe(), correlation(), covariance(), sampleBy(), 
# approxQuantile(), frequentItems(), and so on.

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



In [18]:
# The DataFrame API
from pyspark.sql import Row
row = Row(350, True, "Learning Spark 2E", None)
row[2]

'Learning Spark 2E'

In [37]:
# mnm_df = spark.read.csv("Data/mnm_dataset.csv")
mnm_df = (spark.read.format("csv") 
     .option("header", "true") 
     .option("inferSchema", "true") 
     .load('Data/mnm_dataset.csv'))

In [38]:
count_mnm_df = (mnm_df
  .select("State", "Color", "Count") 
  .groupBy("State", "Color") 
  .agg(count("Count") 
  .alias("Total")) 
  .orderBy("Total", ascending=False))

In [39]:
count_mnm_df.explain(True)


== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- Aggregate [State#707, Color#708], [State#707, Color#708, count(Count#709) AS Total#720L]
   +- Project [State#707, Color#708, Count#709]
      +- Relation[State#707,Color#708,Count#709] csv

== Analyzed Logical Plan ==
State: string, Color: string, Total: bigint
Sort [Total#720L DESC NULLS LAST], true
+- Aggregate [State#707, Color#708], [State#707, Color#708, count(Count#709) AS Total#720L]
   +- Project [State#707, Color#708, Count#709]
      +- Relation[State#707,Color#708,Count#709] csv

== Optimized Logical Plan ==
Sort [Total#720L DESC NULLS LAST], true
+- Aggregate [State#707, Color#708], [State#707, Color#708, count(Count#709) AS Total#720L]
   +- Relation[State#707,Color#708,Count#709] csv

== Physical Plan ==
*(3) Sort [Total#720L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(Total#720L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#416]
   +- *(2) HashAggregate(keys=[State#707, Color#708], function