# ETL with PySpark SQL

In [1]:
import os
import sys

os.environ["SPARK_HOME"] = "/Users/projects/.pyenv/versions/3.7.10/envs/tatapower/lib/python3.7/site-packages/pyspark"
# os.environ["HADOOP_HOME"] = ""
# os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
# os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
# os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
# sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

### Importing and creating SparkSession

In [8]:
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [5]:
# Create a SparkSession.
spark = SparkSession.builder\
                .master("local[*]")\
                .appName("ETL")\
                .config("spark.executor.logs.rolling.time.interval", "daily")\
                .getOrCreate()


21/08/05 15:25:01 WARN Utils: Your hostname, Ramjees-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.7 instead (on interface en0)
21/08/05 15:25:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/05 15:25:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/05 15:25:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Setting filesystem and files

Load all CSV's files from HiggsTwitter dataset (http://snap.stanford.edu/data/higgs-twitter.html)

Read all the 5 different zip files into Spark Dataframe.

In [10]:
# Social Network edgelist

# First, we set the filename
file = "HiggsTwitter/higgs-social_network.edgelist.gz"

# Second, Set the Schema where first column is follower and second is followed, both of types integer.
schema = StructType([StructField("follower", IntegerType()), StructField("followed", IntegerType())])

# Create the DataFrame
socialDF = spark.read.csv(path=file, sep=" ", schema=schema)

In [None]:
#Retweet Network

# First, we set the filename
file = "HiggsTwitter/higgs-retweet_network.edgelist.gz"

# Second, Set the Schema where first column is tweeter, second is tweeted, third is occur and all are of type integer.
schema = 

# Create the DataFrame
retweetDF =

In [None]:
# Reply Network

# First, we set the filename
file = "HiggsTwitter/higgs-reply_network.edgelist.gz"

# Second, Set the Schema where first column is replier, second is replied, third is occur and all are of type integer.
schema = 

# Create the DataFrame
replyDF = 


In [None]:
# Mention Network

# First, we set the filename
file = "HiggsTwitter/higgs-mention_network.edgelist.gz"

# Second, Set the Schema where first column is mentioner, second is mentioned, third is occur and all are of type integer.
schema = 

# Create the DataFrame
mentionDF = 

In [None]:
# Activity Time

# First, we set the filename
file = "HiggsTwitter/higgs-activity_time.txt.gz"

# Second, Set the Schema where 
#    * first column is userA (integer)
#    * second is userB (integer)
#    * third is timestamp (integer)
#    * fourth is interaction (string): Interaction can be: RT (retweet), MT (mention) or RE (reply)
schema = 
activityDF = 

### Convert CSV's dataframes to Apache Parquet files

In [None]:
# Save all the five files to parquet format

socialDF
retweetDF
replyDF
mentionDF
activityDF

### Load the parquet files into new dataframes

In [None]:
# Read all the five files from parquet format

socialDFpq = spark.read
retweetDFpq = spark.read
replyDFpq = spark.read
mentionDFpq = spark.read
activityDFpq = spark.read

### Working with dataframes

In [None]:
# Display the schema of the dataframes

socialDFpq
socialDFpq

In [None]:
# Show the top 5 rows of each dataframe

socialDFpq
retweetDFpq
replyDFpq
mentionDFpq
activityDFpq

## Spark SQL using DataFrames API

In [None]:
# Users who have most followers

socialDFpq

In [None]:
# Users who have most mentions
mentionDFpq

In [None]:
# Of the top 5 followed users, how many mentions has each one?

# top_f contains "top 5 users who have most followers"
top_f = 

top_f.

## Spark SQL using SQL language

Create temporary views of all tables so we can use SQL statements

In [None]:
# Create temporary views so we can use SQL statements
socialDFpq.
retweetDFpq.
replyDFpq.
mentionDFpq.
activityDFpq.

In [None]:
# List all the tables in spark memory

spark.

In [None]:
# Users who have most followers using SQL
spark.


In [None]:
# Users who have most mentions using SQL
spark.

In [None]:
# Of the top 5 followed users, how many mentions has each one? (using SQL)


## Performance testing

### GZIP Compressed CSV file vs Parquet file

In [None]:
%%time
# GZIP Compressed CSV
socialDF.groupBy("followed").agg(count("follower").alias("followers")).orderBy(desc("followers")).show(5)

In [None]:
%%time
# Parquet file
socialDFpq.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

### Cached DF vs not cached DF

This time we will cache the 2 previous dataframes (socialDF and socialDFpq) and see how faster is.

In [None]:
# cache dataframes
socialDF.cache()
socialDFpq.cache()

# remove from cache
#socialDF.unpersist()
#socialDFpq.unpersist()

- Note: The first time we run cached dataframes can be slower, but the next times they should run faster.

In [None]:
%%time
# GZIP Compressed CSV (dataframe cached)
socialDF.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)

In [None]:
%%time
# Parquet file (dataframe cached)
socialDFpq.groupBy("followed").agg(count("followed").alias("followers")).orderBy(desc("followers")).show(5)