In [1]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [2]:
# Import packages
from pyspark.sql import SparkSession
# Import the time module so we can time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").config("spark.driver.memory", "2g").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/31 18:46:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/NYC_Building_Violations.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("NYC_Building_Violations.csv"), sep=",", header=True)
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

In [None]:
# Get a summary of the data. 
df.summary().show()

In [None]:
 # Let's create a view with our DataFrame and run SQL that will sum up the boroughs by the type of violation.
# We can output the time this step runs in seconds.
# Because we are timing the executions, remember to run twice to eliminate the "load time" from the discussion.

df.createOrReplaceTempView('violations')
start_time = time.time()

spark.sql("""select VIOLATION_TYPE, sum(BORO) from violations group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Write out the data in parquet format
# Note: That this is pretty much the same as writing out to a csv to your local directory.
# We are telling Spark to overwrite all of the data if it already exists
df.write.parquet('parquet_violations', mode='overwrite')



*   click the folder icon on the left of the notebook to expose the folders and files stored in your colab enviornment.  Notice that a new folder is present with the same name as your parquet file (parquet_title_basic)
*   inside of it you will find 'part-*.parquet' files and a '_SUCCESS' file. 
*  The '_SUCCESS' file is created when Spark creates a Parquet folder
*  the part-* files are binary files that store your compressed data in columnar format





In [None]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_violations')

In [None]:
# A parquet formatted DataFrame has all the same methods as a row-based DataFrame
# We can convert the DataFrame to a view.
p_df.createOrReplaceTempView('p_violations')

In [None]:
# Run the same sql as above.  (Note: If you have small datasets it IS possible that times may be very close.)
# Because we are timing the executions, remember to run twice to eliminate the "load time" from the discussion.

start_time = time.time()
spark.sql("""select VIOLATION_TYPE, sum(BORO) from p_violations group by 1""").show()
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Writing out a csv file from Spark will also create a folder with "part" files.
# These files are not binary or compressed and in reality are just normal csv files broken into partitions.
# You can see the folder 'out_violations.csv' in your local directory.
df.write.csv('out_violations.csv', mode='overwrite')

### Read a parquet file into a Pandas DataFrame

In [None]:
import pandas as pd

In [None]:
# Open the parquet_violations folder and get the name of a file and edit the path to the parquet file.  
# Check for the correct file name, since the file number will change.
parquet_file = "parquet_violations/part-00000-222c5822-8fe4-4450-97d0-65b5d4a334b4-c000.snappy.parquet"

# Convert the parquet file to a Pandas DataFrame. 
part_00000_df = pd.read_parquet(parquet_file, engine='auto')
part_00000_df.head()