In [None]:
from pyspark.sql import SparkSession;
from pyspark.context import SparkContext;

# warehouse_location points to the default location for managed databases and tables
from os.path import abspath
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    . builder \
    .master("local[*]") \
    .appName("ISM6562 PySpark Tutorials") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

sc =spark.sparkContext
sc.setLogLevel("ERROR") # only display errors (not warnings)

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

In [None]:
spark

### Let's create a pandas dataframe and import it into PySpark

In [1]:
import pandas as pd

In [5]:
df=pd.read_csv("WA_Fn-UseC_-Telco-Customer-Churn.csv")
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [None]:
df_spark = spark.createDataFrame(df)

df_spark.show()

In [None]:
df_spark.printSchema()

###  Export a PySpark dataframe into a pandas dataframe.

In [None]:
df2=df_spark.toPandas()
df2.show()

In [None]:
# spark.stop()

### List the Database and tables in it in the Spark Warehouse

In [None]:
spark.catalog.listTables()  # If no database is specified, the current database and catalog are used. This API includes all temporary views.

In [None]:
df1=spark.sql("show databases")
df1.show()

In [None]:
tables=spark.sql("show tables")
tables.show()

### Load the dataset into the spark datawarehouse

 We will use the spark dataframe API to load the data. We will then use the spark sql API to create a table from the dataframe.

In [None]:
telcochurn = spark.read.csv('data/telcochurn.csv', header=True, inferSchema=True);

# display the first 5 rows of the dataframe
telcochurn.show(5);

In [None]:
telcochurn.createOrReplaceTempView("telcochurn_tmp_view")  # create a tempview table for the telco data

In [None]:
df = spark.sql("SELECT * FROM telcochurn_tmp_view")
df.show(5)

In [None]:
tables = spark.sql("show tables").show()

In [None]:
df = spark.sql("SELECT * FROM telcochurn_tmp_view") # note that this will generate an error
df.show()

In [None]:
type(telcochurn)

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS w10_db;")

In [None]:

#boston.writeTo('boston')

telcochurn.write.mode("overwrite").saveAsTable("w10_db.telcochurn")

#boston.write.mode("overwrite").saveAsTable("boston")


In [None]:
spark.catalog.listTables('w10_db')

In [None]:
df = spark.sql("SELECT * FROM w10_db.telcochurn")
df.show()

In [None]:
# For now, we will keep the table and access it in another notebook. Therefore, this line is commented out
#spark.sql("DROP TABLE telcochurn")

In [None]:
spark.catalog.listTables()

In [None]:
spark.catalog.listTables('w10_db')

In [None]:
# spark.stop()

### Working with Pyspark magic SQL

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
# see here for more info on the schema: https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection
# and here https://sparkbyexamples.com/pyspark/pyspark-sql-types-datatype-with-examples/

schema = StructType([
    StructField("movieid", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("date", StringType(), True),
    StructField("unkown", StringType(), True),
    StructField("url", StringType(), True),
    ])

churn = spark.read.csv('data/telcochurn.csv', header=False, schema=schema,  sep = '|')

# display the first 5 rows of the dataframe
churn.show(5)

In [None]:
churn.createOrReplaceTempView("movies_tmp")

In [None]:
%load_ext sparksql_magic

# Change the code from here

In [None]:
# demonstration, note that when using sparksql, we can save the results in a temporary view
# but this (and other sparksql switches) will not work with VSCode. It will work in Jupyter Notebook.
# %%sparksql --view tempdf
# select * from movies_tmp limit 10

In [None]:
# We can use sparksql to show current tables, but this will only work in Jupyter Notebook. It will 
# not work in VSCode.
#%%sparksql 
#SHOW TABLES

In [None]:
spark_df = spark.sql("""SELECT
  movieid,
  title
FROM movies_tmp"""
)
spark_df.show()

In [None]:
spark_df.write.saveAsTable("movies", mode='overwrite')

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
# see here for more info on the schema: https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection
# and here https://sparkbyexamples.com/pyspark/pyspark-sql-types-datatype-with-examples/

schema = StructType([
    StructField("userid", IntegerType(), True),
    StructField("movieid", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    ])

movierating = spark.read.csv('data/u.data', header=False, schema=schema,  sep = '\t')

# display the first 5 rows of the dataframe
movierating.show(5)

In [None]:
movierating.write.saveAsTable("movieratings", mode='overwrite')

In [None]:
%%sparksql
select * from movieratings limit 10

In [None]:
dfRating = spark.table('movieratings')
dfMovies = spark.table('movies')

In [None]:
# for more on colaborative filtering, see here https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html
# 
from pyspark.ml.recommendation import ALS
 
#split training and testing
(dftraining, dftest) = dfRating.randomSplit([0.8, 0.2])
 
## Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userid", 
    itemCol="movieid", ratingCol="rating",
    coldStartStrategy="drop")
model = als.fit(dftraining)
 
#display predicted rating
predictions = model.transform(dftest)
predictions.show()
 

In [None]:
# spark.stop()