## Basic PySpark configuration

In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f499813ad10>


In [4]:
rdd=spark.sparkContext.parallelize([1,2,3,4,5,6])

## Basic RDD creation

In [8]:
print("Count: ", rdd.count())
print("RDD: ", rdd.collect())

Count:  6
RDD:  [1, 2, 3, 4, 5, 6]


In [9]:
# Create RDD from parallelize    
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd=spark.sparkContext.parallelize(dataList)

In [10]:
print("Count: ", rdd.count())
print("RDD: ", rdd.collect())

Count:  3
RDD:  [('Java', 20000), ('Python', 100000), ('Scala', 3000)]


### Some basic SQL queries

In [11]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [12]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [14]:
# SQL query
df.createOrReplaceTempView("PERSON_DATA")
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()


root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [16]:
# GroupBy example

groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()

+------+--------+
|gender|count(1)|
+------+--------+
|     M|       3|
|     F|       2|
+------+--------+



## Working with CSV DF

[PySpark Read CSV file into DataFrame](https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/)

In [22]:
# Took dataset on Crime in Atlanta 2009-2017 ~30Mb
# https://data.world/bryantahb/crime-in-atlanta-2009-2017

csv_df = spark.read.option("header", True).option("inferSchema",True).csv("atlcrime.csv")
csv_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- crime: string (nullable = true)
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- beat: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- npu: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)



In [24]:
csv_df.show(3)

+---+--------------------+---------+----------+--------------------+----+------------------+---+--------+---------+
|_c0|               crime|   number|      date|            location|beat|      neighborhood|npu|     lat|     long|
+---+--------------------+---------+----------+--------------------+----+------------------+---+--------+---------+
|  0| LARCENY-NON VEHICLE|103040029|10/31/2010|    610 SPRING ST NW| 509|          Downtown|  M|33.77101|-84.38895|
|  1|          AUTO THEFT|103040061|10/31/2010|       850 OAK ST SW| 401|          West End|  T|33.74057| -84.4168|
|  2|LARCENY-FROM VEHICLE|103040169|10/31/2010|1344 METROPOLITAN...| 301|Capitol View Manor|  X|33.71803|-84.40774|
+---+--------------------+---------+----------+--------------------+----+------------------+---+--------+---------+
only showing top 3 rows



In [48]:
# Read using schema with types
# PySpark StructType & StructField Explained with Examples
# https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, DoubleType, LongType

schema = StructType([ \
    StructField("_c0",LongType(),True), \
    StructField("crime",StringType(),True), \
    StructField("number",LongType(),True), \
    StructField("date",StringType(),True), \
    StructField("location", StringType(), True), \
    StructField("beat", StringType(), True), \
    StructField("neighborhood", StringType(), True), \
    StructField("npu", StringType(), True), \
    StructField("lat", DoubleType(), True), \
    StructField("long", DoubleType(), True) \
  ])


df = spark.read.option("header", True).schema(schema).csv("atlcrime.csv")
df.printSchema()
df.show(10)

root
 |-- _c0: long (nullable = true)
 |-- crime: string (nullable = true)
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- beat: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- npu: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)

+---+--------------------+---------+----------+--------------------+----+--------------------+---+--------+---------+
|_c0|               crime|   number|      date|            location|beat|        neighborhood|npu|     lat|     long|
+---+--------------------+---------+----------+--------------------+----+--------------------+---+--------+---------+
|  0| LARCENY-NON VEHICLE|103040029|10/31/2010|    610 SPRING ST NW| 509|            Downtown|  M|33.77101|-84.38895|
|  1|          AUTO THEFT|103040061|10/31/2010|       850 OAK ST SW| 401|            West End|  T|33.74057| -84.4168|
|  2|LARCENY-FROM VEHICLE|103040169|

In [49]:
# https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-coalesce/
import os

os.system('rm -rf csv_rdd')

print("Partitions: ", df.rdd.getNumPartitions())
df.rdd.saveAsTextFile("csv_rdd")

Partitions:  7


## Sample Data pipeline

In [50]:
from pyspark.sql.functions import *

# Conver String to Date type
df = df.withColumn('date_imr', to_date('date', "MM/dd/yyyy"))
df = df.drop("date")

In [51]:
df.show(5)
df.printSchema()

+---+--------------------+---------+--------------------+----+------------------+---+--------+---------+----------+
|_c0|               crime|   number|            location|beat|      neighborhood|npu|     lat|     long|  date_imr|
+---+--------------------+---------+--------------------+----+------------------+---+--------+---------+----------+
|  0| LARCENY-NON VEHICLE|103040029|    610 SPRING ST NW| 509|          Downtown|  M|33.77101|-84.38895|2010-10-31|
|  1|          AUTO THEFT|103040061|       850 OAK ST SW| 401|          West End|  T|33.74057| -84.4168|2010-10-31|
|  2|LARCENY-FROM VEHICLE|103040169|1344 METROPOLITAN...| 301|Capitol View Manor|  X|33.71803|-84.40774|2010-10-31|
|  3|          AUTO THEFT|103040174|    1752 PRYOR RD SW| 307|    Betmar LaVilla|  Y|33.70731|-84.39674|2010-10-31|
|  4| LARCENY-NON VEHICLE|103040301|JOHN WESLEY DOBBS...| 604|   Old Fourth Ward|  M|33.75947|-84.36626|2010-10-31|
+---+--------------------+---------+--------------------+----+----------

In [55]:
df = df.withColumnRenamed("date_imr", "date")
df = df.withColumnRenamed("_c0", "id")


## Count how much crimes are happening per neighbourhood

In [60]:
df.show(5, truncate=False)

+---+--------------------+---------+---------------------------------------+----+------------------+---+--------+---------+----------+
|id |crime               |number   |location                               |beat|neighborhood      |npu|lat     |long     |date      |
+---+--------------------+---------+---------------------------------------+----+------------------+---+--------+---------+----------+
|0  |LARCENY-NON VEHICLE |103040029|610 SPRING ST NW                       |509 |Downtown          |M  |33.77101|-84.38895|2010-10-31|
|1  |AUTO THEFT          |103040061|850 OAK ST SW                          |401 |West End          |T  |33.74057|-84.4168 |2010-10-31|
|2  |LARCENY-FROM VEHICLE|103040169|1344 METROPOLITAN PKWY SW              |301 |Capitol View Manor|X  |33.71803|-84.40774|2010-10-31|
|3  |AUTO THEFT          |103040174|1752 PRYOR RD SW                       |307 |Betmar LaVilla    |Y  |33.70731|-84.39674|2010-10-31|
|4  |LARCENY-NON VEHICLE |103040301|JOHN WESLEY DOBBS A

In [64]:
crimes_per_neighborhood_df = df.groupBy("neighborhood").count()
crimes_per_neighborhood_df.show()

+--------------------+-----+
|        neighborhood|count|
+--------------------+-----+
|    Ben Hill Terrace|  345|
|            Deerwood|  225|
| Westover Plantation|   41|
|           Vine City| 4224|
|     Ben Hill Forest|   35|
|   Lindbergh/Morosgo| 3319|
|The Villages at C...| 1027|
|    East Ardley Road|   37|
|    Whitewater Creek|   41|
|        Orchard Knob|  416|
|          Brookhaven|  207|
|  Sandlewood Estates|  143|
|             Joyland|  469|
|              Bolton|  837|
|            Fairburn|  264|
|   Ridgecrest Forest|  100|
|        Chalet Woods|   60|
|    Buckhead Village| 1055|
|     Cascade Heights|  427|
|          Briar Glen|   92|
+--------------------+-----+
only showing top 20 rows



In [67]:
crimes_per_neighborhood_df.rdd.getNumPartitions()

1

In [69]:
df2 = df.withColumn("month", date_format("date", "MMMM"))
df2.show()

+---+--------------------+---------+--------------------+----+--------------------+---+--------+---------+----------+-------+
| id|               crime|   number|            location|beat|        neighborhood|npu|     lat|     long|      date|  month|
+---+--------------------+---------+--------------------+----+--------------------+---+--------+---------+----------+-------+
|  0| LARCENY-NON VEHICLE|103040029|    610 SPRING ST NW| 509|            Downtown|  M|33.77101|-84.38895|2010-10-31|October|
|  1|          AUTO THEFT|103040061|       850 OAK ST SW| 401|            West End|  T|33.74057| -84.4168|2010-10-31|October|
|  2|LARCENY-FROM VEHICLE|103040169|1344 METROPOLITAN...| 301|  Capitol View Manor|  X|33.71803|-84.40774|2010-10-31|October|
|  3|          AUTO THEFT|103040174|    1752 PRYOR RD SW| 307|      Betmar LaVilla|  Y|33.70731|-84.39674|2010-10-31|October|
|  4| LARCENY-NON VEHICLE|103040301|JOHN WESLEY DOBBS...| 604|     Old Fourth Ward|  M|33.75947|-84.36626|2010-10-31|O

In [71]:
df3 = df2.filter(df2.crime == "AUTO THEFT")
df3.show()

+---+----------+---------+--------------------+----+--------------------+---+--------+---------+----------+-------+
| id|     crime|   number|            location|beat|        neighborhood|npu|     lat|     long|      date|  month|
+---+----------+---------+--------------------+----+--------------------+---+--------+---------+----------+-------+
|  1|AUTO THEFT|103040061|       850 OAK ST SW| 401|            West End|  T|33.74057| -84.4168|2010-10-31|October|
|  3|AUTO THEFT|103040174|    1752 PRYOR RD SW| 307|      Betmar LaVilla|  Y|33.70731|-84.39674|2010-10-31|October|
| 16|AUTO THEFT|103040890|202 RICHARDSON ST...| 303|      Mechanicsville|  V|33.74075|-84.39454|2010-10-31|October|
| 20|AUTO THEFT|103040966|    165 MARION PL NE| 609|            Edgewood|  O|33.75795|-84.34501|2010-10-31|October|
| 23|AUTO THEFT|103040981|    901 BOLTON RD NW| 114|     Bankhead/Bolton|  H|33.77948|-84.49901|2010-10-31|October|
| 29|AUTO THEFT|103041136|     1306 HILL ST SE| 305|      Chosewood Park

In [72]:
df3.rdd.getNumPartitions()

7

In [73]:
df3.collect()

[Row(id=1, crime='AUTO THEFT', number=103040061, location='850 OAK ST SW', beat='401', neighborhood='West End', npu='T', lat=33.74057, long=-84.4168, date=datetime.date(2010, 10, 31), month='October'),
 Row(id=3, crime='AUTO THEFT', number=103040174, location='1752 PRYOR RD SW', beat='307', neighborhood='Betmar LaVilla', npu='Y', lat=33.70731, long=-84.39674, date=datetime.date(2010, 10, 31), month='October'),
 Row(id=16, crime='AUTO THEFT', number=103040890, location='202 RICHARDSON ST SW ', beat='303', neighborhood='Mechanicsville', npu='V', lat=33.74075, long=-84.39454, date=datetime.date(2010, 10, 31), month='October'),
 Row(id=20, crime='AUTO THEFT', number=103040966, location='165 MARION PL NE', beat='609', neighborhood='Edgewood', npu='O', lat=33.75795, long=-84.34501, date=datetime.date(2010, 10, 31), month='October'),
 Row(id=23, crime='AUTO THEFT', number=103040981, location='901 BOLTON RD NW', beat='114', neighborhood='Bankhead/Bolton', npu='H', lat=33.77948, long=-84.49901,