In [1]:
!pip install spark
!pip install pyspark
!pip install findspark
# Import and initialize findspark
import findspark
findspark.init()

Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/41.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.0/41.0 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: spark
  Building wheel for spark (setup.py) ... [?25l[?25hdone
  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58748 sha256=b766f09f7a97e5fed810c13cb4d3425cac236d61499b8067608a9c1b49744250
  Stored in directory: /root/.cache/pip/wheels/63/88/77/b4131110ea4094540f7b47c6d62a649807d7e94800da5eab0b
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25

In [2]:
# Import packages
from pyspark.sql import SparkSession
# Import the time module so we can time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").config("spark.driver.memory", "2g").getOrCreate()

In [3]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/NYC_Building_Violations.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("NYC_Building_Violations.csv"), sep=",", header=True)
df.show()

+----------------+----+-------+-----+-----+----------+-------------------+----------------+------------+--------------------+----------------+--------------------+-------------+--------------------+----------+--------------------+--------------------+--------------------+
|ISN_DOB_BIS_VIOL|BORO|    BIN|BLOCK|  LOT|ISSUE_DATE|VIOLATION_TYPE_CODE|VIOLATION_NUMBER|HOUSE_NUMBER|              STREET|DISPOSITION_DATE|DISPOSITION_COMMENTS|DEVICE_NUMBER|         DESCRIPTION|ECB_NUMBER|              NUMBER|  VIOLATION_CATEGORY|      VIOLATION_TYPE|
+----------------+----+-------+-----+-----+----------+-------------------+----------------+------------+--------------------+----------------+--------------------+-------------+--------------------+----------+--------------------+--------------------+--------------------+
|         2286033|   1|1009713|00577|00019|  20180507|                  E|     9027/627971|          34|        WEST 14TH ST|        20220509|PPN203 AOC SUB 05...|      1P13420|    

In [4]:
# Get a summary of the data.
df.summary().show()

+-------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+----------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+
|summary|  ISN_DOB_BIS_VIOL|              BORO|               BIN|             BLOCK|               LOT|         ISSUE_DATE|VIOLATION_TYPE_CODE|VIOLATION_NUMBER|      HOUSE_NUMBER|            STREET|    DISPOSITION_DATE|DISPOSITION_COMMENTS|       DEVICE_NUMBER|         DESCRIPTION|          ECB_NUMBER|            NUMBER|  VIOLATION_CATEGORY|      VIOLATION_TYPE|
+-------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+----------------+------------------+------------------+--------------------+--------------------+--------------------+------

In [5]:
 # Let's create a view with our DataFrame and run SQL that will sum up the boroughs by the type of violation.
# We can output the time this step runs in seconds.
# Because we are timing the executions, remember to run twice to eliminate the "load time" from the discussion.
df.createOrReplaceTempView('violations')
start_time = time.time()

spark.sql("""select VIOLATION_TYPE,sum(BORO) from violations group by VIOLATION_TYPE""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+--------------------+---------+
|      VIOLATION_TYPE|sum(BORO)|
+--------------------+---------+
|LL10/80-LOCAL LAW...|   3609.0|
|LL11/98-LOCAL LAW...|   9285.0|
|HVIOS-NYCHA ELEV ...|    969.0|
|P-PLUMBING       ...|  29480.0|
|ACH1-(NYCHA) - EL...|   4949.0|
|LANDMRK-LANDMARK ...|   5599.0|
|LL5-LOCAL LAW 5/7...|   1363.0|
|IMD-IMMEDIATE EME...|     13.0|
|B-BOILER         ...|  17042.0|
|FISP-FACADE SAFET...|   6889.0|
|EGNCY-EMERGENCY  ...|  12607.0|
|ES-ELECTRIC SIGNS...|  18378.0|
|                NULL|    148.0|
|L1198-LOCAL LAW 1...|  10656.0|
|HBLVIO-HIGH PRESS...|  14628.0|
|BENCH-FAILURE TO ...| 110285.0|
|RWNRF-RETAINING W...|   4007.0|
|FISPNRF-NO REPORT...|  21017.0|
|LL2604-PHOTOLUMIN...|    679.0|
|LL2604S-SPRINKLER...|   1513.0|
+--------------------+---------+
only showing top 20 rows

--- 10.969998598098755 seconds ---


In [6]:
# Write out the data in parquet format
# Note: That this is pretty much the same as writing out to a csv to your local directory.
# We are telling Spark to overwrite all of the data if it already exists
df.write.parquet('parquet_violations', mode='overwrite')

In [7]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_violations')

In [8]:
# A parquet formatted DataFrame has all the same methods as a row-based DataFrame
# We can convert the DataFrame to a view.
p_df.createOrReplaceTempView('p_violations')

In [9]:
# Run the same sql as above.  (Note: If you have small datasets it IS possible that times may be very close.)
# Because we are timing the executions, remember to run twice to eliminate the "load time" from the discussion.
start_time = time.time()

spark.sql("""select VIOLATION_TYPE,sum(BORO) from p_violations group by VIOLATION_TYPE""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+--------------------+---------+
|      VIOLATION_TYPE|sum(BORO)|
+--------------------+---------+
|LL10/80-LOCAL LAW...|   3609.0|
|LL11/98-LOCAL LAW...|   9285.0|
|HVIOS-NYCHA ELEV ...|    969.0|
|P-PLUMBING       ...|  29480.0|
|ACH1-(NYCHA) - EL...|   4949.0|
|LANDMRK-LANDMARK ...|   5599.0|
|LL5-LOCAL LAW 5/7...|   1363.0|
|IMD-IMMEDIATE EME...|     13.0|
|FISP-FACADE SAFET...|   6889.0|
|B-BOILER         ...|  17042.0|
|EGNCY-EMERGENCY  ...|  12607.0|
|ES-ELECTRIC SIGNS...|  18378.0|
|                NULL|    148.0|
|L1198-LOCAL LAW 1...|  10656.0|
|HBLVIO-HIGH PRESS...|  14628.0|
|BENCH-FAILURE TO ...| 110285.0|
|RWNRF-RETAINING W...|   4007.0|
|FISPNRF-NO REPORT...|  21017.0|
|LL2604-PHOTOLUMIN...|    679.0|
|LL2604S-SPRINKLER...|   1513.0|
+--------------------+---------+
only showing top 20 rows

--- 2.096193313598633 seconds ---


In [10]:
import pandas as pd

In [11]:
# Create the path to the parquet file.  Check for the correct file name, since the file number will change.
parquet_file = "/content/parquet_violations/part-00000-ab936d11-eb4b-41fc-9e7d-f94259263b5a-c000.snappy.parquet"

# Convert the parquet file to a Pandas DataFrame.
parquet_file_df = pd.read_parquet(parquet_file)

In [12]:
parquet_file_df.head()

Unnamed: 0,ISN_DOB_BIS_VIOL,BORO,BIN,BLOCK,LOT,ISSUE_DATE,VIOLATION_TYPE_CODE,VIOLATION_NUMBER,HOUSE_NUMBER,STREET,DISPOSITION_DATE,DISPOSITION_COMMENTS,DEVICE_NUMBER,DESCRIPTION,ECB_NUMBER,NUMBER,VIOLATION_CATEGORY,VIOLATION_TYPE
0,2286033,1,1009713,577,19,20180507,E,9027/627971,34,WEST 14TH ST,20220509,PPN203 AOC SUB 050322 BY BP ELEV CO ...,1P13420,,,V*050718E9027/627971,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
1,2533639,1,1082666,333,1,20210629,E,9027/705433,77,COLUMBIA STREET,20220509,PPN203 AOC SUB 050222 BY MIDTOWN ELEV CO INC ...,1P27474,,,V*062921E9027/705433,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
2,2347979,1,1083846,1130,1,20190423,E,9028/648125,200,CENTRAL PARK WEST,20220509,PPN203 AOC SUBMITTED ON 050522 BY CENTENNIAL E...,1P40861,,,V*042319E9028/648125,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
3,2566336,1,1057155,1889,7502,20211123,E,9028/710097,845,WEST END AVE,20220509,PPN203 AOC SUB 050322 BY BP ELEV CO ...,1P14972,,,V*112321E9028/710097,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
4,2487351,1,1041456,1387,21,20200925,E,9028/689200,31,E 72 ST,20220509,PPN203 AOC SUB 050322 BY BP ELEV CO ...,1P10910,,,V*092520E9028/689200,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
