In [None]:
# Set the PySpark Connection:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark_Basics").getOrCreate()

In [None]:
# Print PySpark and Python versions
import sys
print('Python version: '+sys.version)
print('Spark version: '+spark.version)

### Reading Data from Files (.csv files):

In [None]:
# Reading Data from FILE (with infer_schema = "false"):
import time
start = time.time()

file_location = "movie_data_part1.csv"
file_type = "csv"
infer_schema = "false"
first_row_is_header = "true"
delimiter = "|"

df = spark.read.format(file_type)\
.option("InferSchema", infer_schema)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load(file_location)

end = time.time()
time_taken = (end - start)
print(f"Total time taken {time_taken} seconds")

In [None]:
# Print Metadata
df.printSchema()

In [None]:
# Reading Data from FILE (with infer_schema = "true"):
import time
start = time.time()

file_location = "movie_data_part1.csv"
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = "|"

df = spark.read.format(file_type)\
.option("InferSchema", infer_schema)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load(file_location)

end = time.time()
time_taken = (end - start)
print(f"Total time taken {time_taken} seconds")

### Reading Data from Files (.json files):

In [None]:
df_json = spark.read.json('data.json')

In [None]:
df_json.show(5,False)

In [None]:
df_json['id','timestamp'].show(4, False)

### Reading Metadata

In [None]:
# Print Metadata
df.printSchema()

In [None]:
# Another way of printing the data types:
df.dtypes

### Counting Records

In [None]:
# Counting the total number of records:
df.count()
print(f"Total number of records in the dataset: {df.count()}")

In [None]:
# Showing the dataframe:
df.show(5, False)

### Subset Columns and View a Glimpse of the Data

In [None]:
# Dropping a few columns:
dropped_columns=["overview","belongs_to_collection","production_companies","production_countries","status","original_title"]
df_dropped = df.drop(*dropped_columns)
df_dropped.show(5, False)

In [None]:
# Creating the dataframe with the selected columns:
select_column = ['id','budget','popularity','release_date','revenue','title']
df = df.select(*select_column)
df.show(5)

In [None]:
# Subsetting Columns (another way based on index)
df = spark.read.format(file_type)\
.option("inferSchema", infer_schema)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load(file_location)

df=df.select(df[2],df[1],df[6],df[9],df[10],df[14])
df.show(5)

### Missing Values

In [None]:
# Identifying missing values (for one test variable (e.g., popularity))
from pyspark.sql.functions import *
df.filter((df['popularity']=='') | (df['popularity'].isNull()) | (isnan(df['popularity']))).count()

In [None]:
# Identifying missing values from all columns:
df.select([count(when((col(c)=='') | col(c).isNull()| isnan(c),c)).alias(c) for c in df.columns]).show()

### One-way Frequencies

In [None]:
# One way Frequencies
df.groupBy(df['title']).count().show()

In [None]:
#Frequencies - descending order
df.groupby(df['title']).count().sort(desc("count")).show(10)

###### Note: grouby is an alias for groupBy; both the variants will work

### Sorting and Filtering One-way frequencies

In [None]:
#  One way frquencies with filters

# first creating a temporary dataframe:
df_temp = df.filter((df['title']!='')&(df['title'].isNotNull())&(~isnan(df['title'])))

# showing the filtered results:
df_temp.groupby(df_temp['title']).count().filter("`count`>4").sort(desc("count")).show(10)

# counting the number:
df_temp.groupby(df_temp['title']).count().filter("`count`>4").sort(desc("count")).count()

In [None]:
# Delete any temporary dataframe that we created in the process
del df_temp

### Casting Variables

In [None]:
#Before Casting
df.dtypes

In [None]:
# Casting
df = df.withColumn('budget',df['budget'].cast("float"))

In [None]:
#After Casting
df.dtypes

In [None]:
# Casting multiple variables
from pyspark.sql.types import *

#Identifying and assiging lists of variables  
int_vars=['id']
float_vars=['budget', 'popularity', 'revenue']
date_vars=['release_date']

for column in int_vars:
    df=df.withColumn(column,df[column].cast(IntegerType()))
for column in float_vars:
    df=df.withColumn(column,df[column].cast(FloatType()))
for column in date_vars:
    df=df.withColumn(column,df[column].cast(DateType()))

In [None]:
#After Casting
df.dtypes

In [None]:
#After Casting output
df.show(10)

### Descriptive Statistics

In [None]:
# Describe function
df.describe().show()

##### Three parameters have to be passed through *approxQuantile* function, as follows:

* ***col*** - the name of the numerical column
* ***probabilities*** - a list of quantile probabilities between [0,1]: 0: minimum; 0.5: median; 1: maximum
* ***relativeError*** - The relative target precision to achieve (>=0)

In [None]:
# Minimum/Median/Maximum Calculation:
df_temp = df.filter((df['budget']!=0)&(df['budget'].isNotNull()) & (~isnan(df['budget'])))

# minimum value:
min_val=df_temp.approxQuantile('budget',[0.0],0.001)
# median value:
median=df_temp.approxQuantile('budget',[0.5],0.001)
# maximum value:
max_val=df_temp.approxQuantile('budget',[1.0],0.001)

print (f'The minimum budget is: {min_val} ')
print (f'The median of budget is: {median} ')
print (f'The maximum budget is: {max_val} ')

### Unique/Distinct Values and Counts

In [None]:
# Distinct Counts
df.agg(countDistinct(col("title")).alias("count")).show()

In [None]:
# Displaying the Distinct Values
df.select('title').distinct().show(10, False)

In [None]:
# Distinct Aggregations (time)
# First check how the release date column look like:
sel_column = ['release_date']
df_date = df.select(*sel_column)
df_date.show(5)

In [None]:
df_temp = df.withColumn('release_year', year('release_date')) 
df_temp.show(10)

In [None]:
df_temp.groupBy("release_year").agg(countDistinct("title")).show(10, False)

In [None]:
# Datetime extractions
df_temp=df_temp.withColumn('release_month',month('release_date'))
df_temp.groupBy("release_month").agg(countDistinct("title")).show(10, False)

In [None]:
df_temp=df_temp.withColumn('release_day',dayofmonth('release_date'))
df_temp.groupBy("release_day").agg(countDistinct("title")).show(10, False)

In [None]:
# Output of datetime extractions
df_temp.show(5, False)

### Filtering

In [None]:
# Filtering based on like
# title starting with 'Meet' in the following example:
df.filter(df['title'].like('Meet%')).show(10,False)

In [None]:
# Filtering based on not like
# Titles that does not end with 's' in the following example:
df.filter(~df['title'].like('%s')).show(5,False)

In [None]:
# Filtering based on not regular expressions - method 1
df.filter(df['title'].rlike('[A-Z]*ove')).show(5,False)
df.filter(df['title'].rlike('[A-Z]*ove')).count()

In [None]:
# Filtering based on contain function - method 2
df.filter(df.title.contains('ove')).show(5, False)
df.filter(df.title.contains('ove')).count()

In [None]:
# Filtering based on regular expression - another method
# \w: identifies all upper and lowercase alphabets and numbers from 0 to 9
df.filter(df['title'].rlike('\w*ove')).count()

### Creating New Columns

In [None]:
# New Columns
mean_pop=df.agg({'popularity': 'mean'}).collect()[0]['avg(popularity)']
mean_pop

In [None]:
count_obs= df.count()
count_obs

In [None]:
# The lit function is a way to interact with column literals. It is very useful 
# when you want to create a column with a value directly
df=df.withColumn('mean_popularity',lit(mean_pop))
df.show(5)

In [None]:
df=df.withColumn('varaiance',pow((df['popularity']-df['mean_popularity']),2))
df.printSchema()

In [None]:
# New Columns - output
df.show(5, False)

In [None]:
# variance calculation:
variance_sum = df.agg({'varaiance':'sum'}).collect()[0]['sum(varaiance)']
print(f"Variance Summation: {variance_sum}")

variance_population = variance_sum/(count_obs - 1)
print(f"Variance: {variance_population}")

In [None]:
# standard deviation:
import math
math.sqrt(variance_population)

In [None]:
# Apply the user defined function on the dataframe:
# Step 1. First create the user defined function:
def new_cols(budget, popularity):
    if budget < 10000000:
        budget_cat = 'small'
    elif budget < 100000000:
        budget_cat = 'medium'
    else:
        budget_cat = 'big'
    
    if popularity < 3:
        ratings = 'low'
    elif popularity < 5:
        ratings = 'mid'
    else:
        ratings = 'high'
    return budget_cat, ratings

In [None]:
# Step 2: Define the Field Type of the New Columns
udfB=udf(new_cols,StructType([StructField("budget_cat", StringType(), True),StructField("ratings", StringType(), True)]))
temp_df=df.select('id','budget','popularity').withColumn("newcat",udfB("budget","popularity"))

In [None]:
# Step 3: Unbundle the struct type columns into individual columns and drop the struct type 
df_with_newcols = temp_df.select('id','budget','popularity','newcat').withColumn('budget_cat', temp_df.newcat.getItem('budget_cat')).withColumn('ratings', temp_df.newcat.getItem('ratings')).drop('newcat')
df_with_newcols.show(5,False)

In [None]:
#  New Columns - Observe Metadata
temp_df.printSchema()

In [None]:
# Another way of creating the columns (with conditions)
df_with_newcols = df.select('id','budget','popularity').\
withColumn('budget_cat', when(df['budget']<10000000,'Small').when(df['budget']<100000000,'Medium').otherwise('Big')).\
withColumn('ratings', when(df['popularity']<3,'Low').when(df['popularity']<5,'Mid').otherwise('High'))

df_with_newcols.show(5, False)

### Deleting and Renaming Columns:

In [None]:
df_with_newcols.printSchema()

In [None]:
# Remove 'budget_cat' column:
columns_to_drop = ['budget_cat']
df_with_newcols = df_with_newcols.drop(*columns_to_drop)

df_with_newcols.printSchema()

In [None]:
# Renaming a column:
df_with_newcols = df_with_newcols.withColumnRenamed('id','film_id').withColumnRenamed('ratings','film_ratings')

df_with_newcols.printSchema()

### Don't forget to stop spark when you are done!

In [None]:
spark.stop()

### Great job!