# Intro to DataFrames

References:

- DataFrame API docs: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

Rules of thumb:
- Hit tab to auto-complete
- To see all available methods, place a dot (.) after the RDD (e.g. words.) and hit tab 
- Use `.collect()` to see the contents of the RDD

Solutions for potentially challenging exercises can be found in the end of the section. Don't peek unless you're really stuck!

In [1]:
# like in the pyspark shell, SparkSession is already defined
spark

## 1. DataFrame methods

### 1.1 Data input

In [4]:
df = spark.read.json("../data/people/names.json")
df.head()
# other supported file formats:
# spark.read.parquet("../data/pems_sorted/")
# spark.read.text()
# spark.read.csv()
# spark.read.orc()

# generic form: 
# spark.read.load("path/to/someFile.csv", format="csv", sep=":", inferSchema="true", header="true")

# Loading data from a JDBC source
# jdbcDF = spark.read \
#     .format("jdbc") \
#     .option("url", "jdbc:postgresql:dbserver") \
#     .option("dbtable", "schema.tablename") \
#     .option("user", "username") \
#     .option("password", "password") \
#     .load()

Row(AGE=36, gender='female', height=180, name='Zoe')

In [8]:
# TODO: write reading different files in ../data
word_df = spark.read.text("../data/word_count")
word_df.head()

Row(value='Basics of the Unix Philosophy')

In [16]:
spark.read.parquet("../data/pems_sorted/").head(2)

[Row(timeperiod='09/04/2016 00:00:19', flow1=5, occupancy1=0.025, speed1=78.0, flow2=7, occupancy2=0.0311, speed2=71.0, flow3=1, occupancy3=0.4706, speed3=1.0, flow4=None, occupancy4=None, speed4=None, flow5=None, occupancy5=None, speed5=None, flow6=None, occupancy6=None, speed6=None, flow7=None, occupancy7=None, speed7=None, flow8=None, occupancy8=None, speed8=None, station=402260),
 Row(timeperiod='09/04/2016 00:00:49', flow1=0, occupancy1=0.0, speed1=0.0, flow2=0, occupancy2=0.0, speed2=0.0, flow3=0, occupancy3=0.0, speed3=0.0, flow4=None, occupancy4=None, speed4=None, flow5=None, occupancy5=None, speed5=None, flow6=None, occupancy6=None, speed6=None, flow7=None, occupancy7=None, speed7=None, flow8=None, occupancy8=None, speed8=None, station=402260)]

In [14]:
spark.read.csv("../data/credit_card").head(2)

[Row(_c0='Time', _c1='V1', _c2='V2', _c3='V3', _c4='V4', _c5='V5', _c6='V6', _c7='V7', _c8='V8', _c9='V9', _c10='V10', _c11='V11', _c12='V12', _c13='V13', _c14='V14', _c15='V15', _c16='V16', _c17='V17', _c18='V18', _c19='V19', _c20='V20', _c21='V21', _c22='V22', _c23='V23', _c24='V24', _c25='V25', _c26='V26', _c27='V27', _c28='V28', _c29='Amount', _c30='Class'),
 Row(_c0='0', _c1='-1.3598071336738', _c2='-0.0727811733098497', _c3='2.53634673796914', _c4='1.37815522427443', _c5='-0.338320769942518', _c6='0.462387777762292', _c7='0.239598554061257', _c8='0.0986979012610507', _c9='0.363786969611213', _c10='0.0907941719789316', _c11='-0.551599533260813', _c12='-0.617800855762348', _c13='-0.991389847235408', _c14='-0.311169353699879', _c15='1.46817697209427', _c16='-0.470400525259478', _c17='0.207971241929242', _c18='0.0257905801985591', _c19='0.403992960255733', _c20='0.251412098239705', _c21='-0.018306777944153', _c22='0.277837575558899', _c23='-0.110473910188767', _c24='0.066928074914673

### 1.2 Data output (writing to disk)

- API docs: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

In [17]:
# writing to a file
df.write.parquet("new_data.parquet")

In [None]:
# overwrite on save
df.write.mode("overwrite").parquet("new_data.parquet")

In [None]:
# You can read from any format and write to any format (barring formatting limitations/rules):
# df.write.csv("new_data.csv",header=True)
# df.write.json("new_data.json")
# df.write.orc("new_data.orc")
# df.write.parquet("new_data.parquet")

# generic form:
# df.write.save("fileName.parquet", format="parquet")
# df.write.mode("overwrite").save("fileName.parquet", format="parquet")

# Saving data to a JDBC source
# jdbcDF.write \
#     .format("jdbc") \
#     .option("url", "jdbc:postgresql:dbserver") \
#     .option("dbtable", "schema.tablename") \
#     .option("user", "username") \
#     .option("password", "password") \
#     .save()


### Exploring DataFrames

In [18]:
df = spark.read.json("../data/people/names.json")

In [19]:
df.head(5)

[Row(AGE=36, gender='female', height=180, name='Zoe'),
 Row(AGE=23, gender='female', height=165, name='Alice'),
 Row(AGE=30, gender='male', height=175, name='Andy'),
 Row(AGE=25, gender='female', height=170, name='Jane'),
 Row(AGE=None, gender='male', height=165, name='Michael')]

In [20]:
df.show(5)

+----+------+------+-------+
| AGE|gender|height|   name|
+----+------+------+-------+
|  36|female|   180|    Zoe|
|  23|female|   165|  Alice|
|  30|  male|   175|   Andy|
|  25|female|   170|   Jane|
|null|  male|   165|Michael|
+----+------+------+-------+
only showing top 5 rows



In [21]:
df.take(5)

[Row(AGE=36, gender='female', height=180, name='Zoe'),
 Row(AGE=23, gender='female', height=165, name='Alice'),
 Row(AGE=30, gender='male', height=175, name='Andy'),
 Row(AGE=25, gender='female', height=170, name='Jane'),
 Row(AGE=None, gender='male', height=165, name='Michael')]

In [22]:
# limit(n) returns a new dataframe with the first n rows of the dataframe
df.limit(3).show()

+---+------+------+-----+
|AGE|gender|height| name|
+---+------+------+-----+
| 36|female|   180|  Zoe|
| 23|female|   165|Alice|
| 30|  male|   175| Andy|
+---+------+------+-----+



In [23]:
df.show()

+----+------+------+-------+
| AGE|gender|height|   name|
+----+------+------+-------+
|  36|female|   180|    Zoe|
|  23|female|   165|  Alice|
|  30|  male|   175|   Andy|
|  25|female|   170|   Jane|
|null|  male|   165|Michael|
|  19|  male|   180| Justin|
+----+------+------+-------+



In [24]:
df.printSchema()

root
 |-- AGE: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- height: long (nullable = true)
 |-- name: string (nullable = true)



In [25]:
df.columns

['AGE', 'gender', 'height', 'name']

In [26]:
df.count()

6

In [27]:
df.describe().show()

+-------+-----------------+------+-----------------+-----+
|summary|              AGE|gender|           height| name|
+-------+-----------------+------+-----------------+-----+
|  count|                5|     6|                6|    6|
|   mean|             26.6|  null|            172.5| null|
| stddev|6.580273550544841|  null|6.892024376045112| null|
|    min|               19|female|              165|Alice|
|    max|               36|  male|              180|  Zoe|
+-------+-----------------+------+-----------------+-----+



### Selecting specific columns in a dataframe

In [34]:
# selecting a column
# Column API docs: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column
df['name']



Column<b'name'>

In [30]:
# creating a new dataframe with only selected columns
df.select('name')

Row(name='Zoe')

In [31]:
# creating a new dataframe with only selected columns
df.select(['name', 'age']).show()

+-------+----+
|   name| age|
+-------+----+
|    Zoe|  36|
|  Alice|  23|
|   Andy|  30|
|   Jane|  25|
|Michael|null|
| Justin|  19|
+-------+----+



In [36]:
# renaming columns
df = df.withColumnRenamed('AGE', 'age')

In [37]:
df

DataFrame[age: bigint, gender: string, height: bigint, name: string]

In [38]:
# Creating new columns
df = df.withColumn('height plus 100', df.height + 100)
df.show()

+----+------+------+-------+---------------+
| age|gender|height|   name|height plus 100|
+----+------+------+-------+---------------+
|  36|female|   180|    Zoe|            280|
|  23|female|   165|  Alice|            265|
|  30|  male|   175|   Andy|            275|
|  25|female|   170|   Jane|            270|
|null|  male|   165|Michael|            265|
|  19|  male|   180| Justin|            280|
+----+------+------+-------+---------------+



In [39]:
# Creating new columns
df = df.withColumn('is_tall', df.height >= 175)
df.show()

+----+------+------+-------+---------------+-------+
| age|gender|height|   name|height plus 100|is_tall|
+----+------+------+-------+---------------+-------+
|  36|female|   180|    Zoe|            280|   true|
|  23|female|   165|  Alice|            265|  false|
|  30|  male|   175|   Andy|            275|   true|
|  25|female|   170|   Jane|            270|  false|
|null|  male|   165|Michael|            265|  false|
|  19|  male|   180| Justin|            280|   true|
+----+------+------+-------+---------------+-------+



### Filtering
`.filter()` takes in either (i) a `Column` of `types.BooleanType` or (ii) a string of SQL expression.

In [40]:
# filter using SQL expressions
# df.where('age >= 25').show() is also possible because .where() is an alias for .filter()
df.filter('age >= 25').show()

+---+------+------+----+---------------+-------+
|age|gender|height|name|height plus 100|is_tall|
+---+------+------+----+---------------+-------+
| 36|female|   180| Zoe|            280|   true|
| 30|  male|   175|Andy|            275|   true|
| 25|female|   170|Jane|            270|  false|
+---+------+------+----+---------------+-------+



In [41]:
# filter using a column of boolean types
df.filter(df.age >= 25).show()

+---+------+------+----+---------------+-------+
|age|gender|height|name|height plus 100|is_tall|
+---+------+------+----+---------------+-------+
| 36|female|   180| Zoe|            280|   true|
| 30|  male|   175|Andy|            275|   true|
| 25|female|   170|Jane|            270|  false|
+---+------+------+----+---------------+-------+



In [42]:
# df.age >= 25 returns a Column of booleans
df.age >= 25

Column<b'(age >= 25)'>

In [43]:
df.filter( (df.age >= 25) & (df.age <= 30) ).show()
# you can use df.age or df['age']
# you can replace & with | for 'or' operations

+---+------+------+----+---------------+-------+
|age|gender|height|name|height plus 100|is_tall|
+---+------+------+----+---------------+-------+
| 30|  male|   175|Andy|            275|   true|
| 25|female|   170|Jane|            270|  false|
+---+------+------+----+---------------+-------+



In [46]:
# TODO: try filtering based on other predicates
df.filter(df.gender == "female").show()

+---+------+------+-----+---------------+-------+
|age|gender|height| name|height plus 100|is_tall|
+---+------+------+-----+---------------+-------+
| 36|female|   180|  Zoe|            280|   true|
| 23|female|   165|Alice|            265|  false|
| 25|female|   170| Jane|            270|  false|
+---+------+------+-----+---------------+-------+



In [47]:
df.filter(df.is_tall).show()

+---+------+------+------+---------------+-------+
|age|gender|height|  name|height plus 100|is_tall|
+---+------+------+------+---------------+-------+
| 36|female|   180|   Zoe|            280|   true|
| 30|  male|   175|  Andy|            275|   true|
| 19|  male|   180|Justin|            280|   true|
+---+------+------+------+---------------+-------+



In [48]:
df.filter(df.name.startswith("A")).show()

+---+------+------+-----+---------------+-------+
|age|gender|height| name|height plus 100|is_tall|
+---+------+------+-----+---------------+-------+
| 23|female|   165|Alice|            265|  false|
| 30|  male|   175| Andy|            275|   true|
+---+------+------+-----+---------------+-------+



### groupBy

TL;DR - `.groupBy()` allows you to group rows together based on its value in some given column(s)
- `df.groupBy([cols])`
- [GroupedData operations](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) (alternatively, you can instantiate a variable with the type of GroupedData, let jupyter notebook's intellisense show you what methods are available:
    - `grouped = df.groupBy('gender')`
    - `grouped.` (and hit tab)

In [49]:
df.head()

Row(age=36, gender='female', height=180, name='Zoe', height plus 100=280, is_tall=True)

In [50]:
df.groupBy('gender')

<pyspark.sql.group.GroupedData at 0x1133c80f0>

In [53]:
grouped = df.groupBy('gender')

In [52]:
df.groupBy('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|female|    3|
|  male|    3|
+------+-----+



In [54]:
df.groupBy('gender').avg().show()

+------+--------+------------------+--------------------+
|gender|avg(age)|       avg(height)|avg(height plus 100)|
+------+--------+------------------+--------------------+
|female|    28.0|171.66666666666666|   271.6666666666667|
|  male|    24.5|173.33333333333334|   273.3333333333333|
+------+--------+------------------+--------------------+



In [55]:
# calculating total age and height of all people (i.e. don't groupby anything)
df.groupBy().sum().show()

+--------+-----------+--------------------+
|sum(age)|sum(height)|sum(height plus 100)|
+--------+-----------+--------------------+
|     133|       1035|                1635|
+--------+-----------+--------------------+



In [60]:
# TODO: calculate average height of all people 
df.select('height').groupBy().avg().show()

+-----------+
|avg(height)|
+-----------+
|      172.5|
+-----------+



In [65]:
# TODO: calculate max height for each gender
df.select(['gender', 'height']).groupBy('gender').max().show()

+------+-----------+
|gender|max(height)|
+------+-----------+
|female|        180|
|  male|        180|
+------+-----------+



In [69]:
# TODO: calculate min height for each gender
df.select(['gender', 'height']).groupBy('gender').min().show()

+------+-----------+
|gender|min(height)|
+------+-----------+
|female|        165|
|  male|        165|
+------+-----------+



### 1.2 Crimes Data

In [79]:
crimes = spark.read.csv("../data/crimes/Crimes_-_One_year_prior_to_present.csv", header=True, inferSchema=True)
crimes.show()
# try the above without the header and inferSchema option. see what happens!

+--------+--------------------+--------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|   CASE#| DATE  OF OCCURRENCE|               BLOCK| IUCR| PRIMARY DESCRIPTION| SECONDARY DESCRIPTION| LOCATION DESCRIPTION|ARREST|DOMESTIC|BEAT|WARD|FBI CD|X COORDINATE|Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|
+--------+--------------------+--------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|JB241987|04/28/2018 10:05:...|    009XX N LONG AVE| 2092|           NARCOTICS|  SOLICIT NARCOTICS...|             SIDEWALK|     Y|       N|1524|  37|    18|     1140136|     1905903|41.897894893|-87.760743714|(41.897894893, -8...|
|JA430240|09/06/2017 01:30:...|     032XX W 26TH ST| 0810|              

In [90]:
# TODO: print the schema of the dataframe (e.g. data type of each column)?
crimes.printSchema()

root
 |-- CASE#: string (nullable = true)
 |-- DATE  OF OCCURRENCE: string (nullable = true)
 |-- BLOCK: string (nullable = true)
 |--  IUCR: string (nullable = true)
 |--  PRIMARY DESCRIPTION: string (nullable = true)
 |--  SECONDARY DESCRIPTION: string (nullable = true)
 |--  LOCATION DESCRIPTION: string (nullable = true)
 |-- ARREST: string (nullable = true)
 |-- DOMESTIC: string (nullable = true)
 |-- BEAT: integer (nullable = true)
 |-- WARD: integer (nullable = true)
 |-- FBI CD: string (nullable = true)
 |-- X COORDINATE: integer (nullable = true)
 |-- Y COORDINATE: integer (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)



In [91]:
# TODO: how many rows are there in the dataframe?
crimes.count()

263191

In [94]:
# TODO: Display the first 2 rows
crimes.show(2)

+--------+--------------------+----------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|   CASE#| DATE  OF OCCURRENCE|           BLOCK| IUCR| PRIMARY DESCRIPTION| SECONDARY DESCRIPTION| LOCATION DESCRIPTION|ARREST|DOMESTIC|BEAT|WARD|FBI CD|X COORDINATE|Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|
+--------+--------------------+----------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|JB241987|04/28/2018 10:05:...|009XX N LONG AVE| 2092|           NARCOTICS|  SOLICIT NARCOTICS...|             SIDEWALK|     Y|       N|1524|  37|    18|     1140136|     1905903|41.897894893|-87.760743714|(41.897894893, -8...|
|JA430240|09/06/2017 01:30:...| 032XX W 26TH ST| 0810|               THEFT|             

In [96]:
# TODO: What columns are in the dataframe?
crimes.columns

['CASE#',
 'DATE  OF OCCURRENCE',
 'BLOCK',
 ' IUCR',
 ' PRIMARY DESCRIPTION',
 ' SECONDARY DESCRIPTION',
 ' LOCATION DESCRIPTION',
 'ARREST',
 'DOMESTIC',
 'BEAT',
 'WARD',
 'FBI CD',
 'X COORDINATE',
 'Y COORDINATE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION']

In [97]:
# Let's rename the improperly formatted column names
columnNames = crimes.columns
for col in columnNames:
    crimes = crimes.withColumnRenamed(col, col.strip())
    
crimes.columns

['CASE#',
 'DATE  OF OCCURRENCE',
 'BLOCK',
 'IUCR',
 'PRIMARY DESCRIPTION',
 'SECONDARY DESCRIPTION',
 'LOCATION DESCRIPTION',
 'ARREST',
 'DOMESTIC',
 'BEAT',
 'WARD',
 'FBI CD',
 'X COORDINATE',
 'Y COORDINATE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION']

In [101]:
# TODO: How many cases resulted in arrest, and how many didn’t?
# Hint: Highlight whitespace between this cell and the next cell to see the hint
crimes.groupBy('ARREST').count().show()


+------+------+
|ARREST| count|
+------+------+
|     Y| 50126|
|     N|213065|
+------+------+



<font color="white">Use .groupBy("ARREST")</font>

In [108]:
# TODO: List the total count of cases for each WARD
crimes.filter(crimes.WARD.isNotNull()).groupBy('WARD').count().show()

+----+-----+
|WARD|count|
+----+-----+
|  31| 3258|
|  34| 6845|
|  28|11071|
|  27|10000|
|  26| 3498|
|  44| 3968|
|  12| 3036|
|  22| 3014|
|  47| 2708|
|   1| 4873|
|  13| 3086|
|  16| 6234|
|   6| 8427|
|   3| 6860|
|  20| 7608|
|  40| 2941|
|  48| 2641|
|   5| 6252|
|  19| 2207|
|  41| 2913|
+----+-----+
only showing top 20 rows



In [112]:
# TODO: List the total count of cases for each WARD, and sort it (by count) in ascending order
crimes.groupBy('WARD').count().sort('count').show()

+----+-----+
|WARD|count|
+----+-----+
|null|    1|
|  19| 2207|
|  36| 2536|
|  39| 2598|
|  48| 2641|
|  33| 2698|
|  47| 2708|
|  38| 2738|
|  45| 2761|
|  50| 2877|
|  41| 2913|
|  40| 2941|
|  22| 3014|
|  12| 3036|
|  13| 3086|
|  14| 3146|
|  23| 3155|
|  35| 3181|
|  11| 3216|
|  30| 3249|
+----+-----+
only showing top 20 rows



In [113]:
# TODO: Show top 10 (WARD, count) pairs with the most number of cases
# To sort in descending order, use the desc() function - .sort(desc("count"))
from pyspark.sql.functions import desc
crimes.groupBy('WARD').count().sort(desc('count')).show(10)


+----+-----+
|WARD|count|
+----+-----+
|  42|18548|
|  24|12457|
|   2|11483|
|  28|11071|
|  27|10000|
|  17| 8595|
|   6| 8427|
|  21| 8043|
|  20| 7608|
|   3| 6860|
+----+-----+
only showing top 10 rows



In [115]:
# TODO: List top 15 categories (PRIMARY DESCRIPTION) of cases
from pyspark.sql.functions import desc
crimes.groupBy('PRIMARY DESCRIPTION').count().sort(desc('count')).show(15)

+--------------------+-----+
| PRIMARY DESCRIPTION|count|
+--------------------+-----+
|               THEFT|64285|
|             BATTERY|49276|
|     CRIMINAL DAMAGE|28118|
|             ASSAULT|19740|
|  DECEPTIVE PRACTICE|17923|
|       OTHER OFFENSE|16561|
|            BURGLARY|12040|
|           NARCOTICS|11664|
|             ROBBERY|11080|
| MOTOR VEHICLE THEFT|10558|
|   CRIMINAL TRESPASS| 6832|
|   WEAPONS VIOLATION| 5003|
|OFFENSE INVOLVING...| 2247|
| CRIM SEXUAL ASSAULT| 1539|
|PUBLIC PEACE VIOL...| 1386|
+--------------------+-----+
only showing top 15 rows



In [116]:
# TODO: List top 5 locations (LOCATION DESCRIPTION) where cases occur
from pyspark.sql.functions import desc
crimes.groupBy('LOCATION DESCRIPTION').count().sort(desc('count')).show(5)

+--------------------+-----+
|LOCATION DESCRIPTION|count|
+--------------------+-----+
|              STREET|58758|
|           RESIDENCE|43732|
|           APARTMENT|33277|
|            SIDEWALK|20542|
|               OTHER|10791|
+--------------------+-----+
only showing top 5 rows



In [121]:
# TODO: Save one of the results to disk (choose any format)
# Note: if your dataframe ends up being partitioned, you can call `your_df.coalesce(1)` before saving (`df.coalesce(1).write...`)
from pyspark.sql.functions import desc
top_locations = crimes.groupBy('LOCATION DESCRIPTION').count().sort(desc('count')).limit(5)
top_locations.write.csv("top_five_crime_locations",header=True)

In [None]:
# TODO: submit the preceeding task as a spark job
# 1. Create a python file named jobs/top_20_crime_locations.py
# 2. define spark session object
#   - from pyspark.sql import SparkSession
#   - spark = SparkSession.builder.appName("MyAppName").getOrCreate()
# 3. Copy the code in the preceeding cell into the file 
# 4. submit the job: ${SPARK_HOME}/bin/spark-submit --master local ./jobs/top_20_crime_locations.py

# if you get stuck, you can refer to ./jobs/top_N_crime_locations_solution.py

In [None]:
# TODO: Use your creativity - create any other interesting DataFrames or insights into the crimes data!

## Using SQL with Spark DataFrames

In [None]:
df = spark.read.json("../data/people/names.json")

In [None]:
df.createOrReplaceTempView('names')

In [None]:
spark.sql("SELECT * FROM names")
# add .show() to see the resulting dataframe. Example:
# df = spark.sql("SELECT * FROM names")
# df.show()

In [None]:
spark.sql("SELECT * FROM names WHERE height > 170").show()