In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [None]:
df = spark.read.json("people.json") #may need whole path file name here

In [None]:
df.show()

In [None]:
df.printSchema()

### Sometimes you need to clarify the Schema - especially if not a CSV file. So that's what we do below 

In [None]:
from pyspark.sql.types import (StructField, 
                               StringType, IntegerType, StructType)

In [None]:
data_schema = [StructField("age", IntegerType(), True), 
              StructField("name", StringType(), True)] #without True, it will show error if any null values

In [None]:
final_struc = StructType(fields=data_schema)

In [None]:
 df = spark.read.json("people.json", schema=final_struc)

In [None]:
df.printSchema() # This whole process clarified variable type - went from long to integer 

In [None]:
df.columns #attribute, not a method

In [None]:
df.describe() #returns a dataframe
df.describe().show()

### Actual Work Time

In [None]:
df.select("age") #this is a dataframe so it is much easier

In [None]:
df.select("age").show()
df.select(["age", "name"]).show()

In [None]:
# There are specialized types such as Column or Row - due to distributive nature of spark 

In [None]:
#To add column
df.withColumn("double_age", df["age"]*2).show()
df.withColumnRenamed("age", "my_new_age").show() #old column name, new column name 

In [None]:
df.createOrReplaceTempView("people") #registered this as sql temp view

In [None]:
results = spark.sql("SELECT * FROM people")
results.show()

In [None]:
new_results = spark.sql("SELECT * FROM people WHERE age=30")
new_results.show()

### Basic Operations 

In [None]:
df = spark.read.csv("apple_stock.csv", inferSchema=True, header=True) #first row in csv file is column names 

In [None]:
df.filter("Close < 500").select(["Open", "Close"]).show() #filter with sql syntax

In [None]:
df.filter(df["Close"] < 500).show()
df.filter(df["Close"] < 500).select("Volume").show()

In [None]:
result = df.filter(df["low"] == 197.16).collect()

In [None]:
row = result[0]
row.asDict()["Volume"]

### Grouping & Aggregating

In [None]:
df.groupBy("Company")

In [None]:
df.groupBy("Company").mean().show() #or .max()/min/sum/count()

In [None]:
df.agg({"Sales": "sum"}).show()

In [None]:
# second way to do agg using groupBy

group_data = df.groupBy("Company")
group_data.agg({"Sales":"Max"}).show() 

In [None]:
from pyspark.sql.functions import countDistinct, avr, stddev

In [None]:
df.select(countDistinct("Sales").alias("Average Sales")).show() #alias - can give new display name 

In [None]:
from pyspark.sql.functions import format_number

df.select(stddev("Sales")).show()

In [None]:
sales_std = df.select(stddev("Sales").alias("Std"))
sales_std.select(format_number("Std", 2).alias("Final")).show()

In [None]:
df.orderBy("Sales").show() #for ascending 

In [None]:
df.orderBy(df["Sales"].desc()).show() #for descending

### Missing Data

In [None]:
df.na.drop().show() #drops any row that has any missing data

In [None]:
df.na.drop(thresh=2) #shows rows with less than number of values missing

In [None]:
df.na.drop(how="all") #only drops row if everthing is null #default set to any

In [None]:
df.na.drop(subset=["Sales"]).show() #gets it to focus on only one (or multiple) column

In [None]:
df.na.fill("FILL VALUE").show() #fills it for columns that are names

In [None]:
df.na.fill(0).show() #fills in columns that are numbers

In [None]:
df.na.fill("No Name", subset=["Name"]).show()

In [None]:
#Filling in mean value
from pyspark.sql.functions import mean
mean_val = df.select(mean(df["Sales"])).collect()
mean_sales = mean_val[0][0]
df.na.fill(mean_sales, subset=["Sales"]) #don't actually have to say subset= due to ordering

In [None]:
#All of that in one line:
df.na.fill(df.select(mean(df["Sales"])).collect()[0][0],["Sales"]).show()

In [None]:
df = sql.Context.sql("SELECT * FROM mytable") #This is just for DATABRICKS 

### Dates & Timestamps 

In [None]:
from pyspark.sql.functions import (dayofmonth, 
                                   weekofyear, hour, dayofyear, 
                                   format_number, 
                                   month, year, 
                                   date_format)
                                   #whenever using these, it goes like this: 

In [None]:
df.select(dayofmonth(df["Date"])).show() #any of them put in spot of dayofmonth 

In [None]:
#How to create new column with information given by above

new_df = df.withColumn("Year", year(df["Date"])) #.show() when not saving
result = new_df.groupBy("Year").mean().select(["Year","avr(Close)"])
result.select(["Year", format_number("avg(Close)", 2).alias("Average Close")]).show()

### Spark Streaming

In [None]:
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext

In [None]:
sc = SparkContext("local[2]", "NetworkWordCount")  #first is number of threads, and second is name of stream

In [None]:
ssc = StreamingContext(sc, 1) #pass spark context and second is interval in seconds 

In [None]:
lines  = ssc.socketTextStream("localhost", 9999) #This creates datastream, #local host port connection 

In [None]:
words = lines.flatMap(lambda line: line.split(" ")) #allows for mapping

In [None]:
pairs = words.map(lambda word: (word, 1))

In [None]:
word_counts = pairs.reduceByKey(lambda num1, num2: num1+num2) #takes in tuple and reduces by the key

In [None]:
word_counts.pprint()

In [None]:
ssc.start()

In [None]:
#Terminal:

nc -lk 9999

### Machine Learning

#### Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
training = spark.read.format("libsvm").load("sample_linear_regression_data.txt") #format used to import different data source type

#So you could do spark.read.csv("sample_linear_regression_data.csv")

In [None]:
lr = LinearRegression() #featuresCol="features", labelCol="label", predictionCol="prediction"
lrModel = lr.fit(training)

In [None]:
lrModel.coefficients
lrModel.intercept 

In [None]:
training_summary = lrModel.summary

In [None]:
training_summary.r2 #.rootMeanSquaredError

##### Train-Test Split

In [None]:
all_data = spark.read.format("libsvm").load("sample_linear_regression_data.txt")

In [None]:
train_data, test_data = all_data.random_split([.75, .25]) #randomSplit

In [None]:
correct_model = lr.fit(train_data)

In [None]:
test_results = correct_model.evaluate(test_data)

In [None]:
test_results.residuals.show() #.rootMeanSquaredError

In [None]:
unlabeled_data = test_data.select("features")

In [None]:
predictions = correct_model.transform(unlabeled_data)

#### More Realistic Example

In [None]:
data = spark.read.csv("ECommerce_Customers", inferSchema=True, header=True)
data.printSchema()

In [None]:
for item in data.head(1)[0]:
    print(item)

In [None]:
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler
data.columns

In [None]:
assembler = VectorAssembler(inputCols=["Avg Session Length", "Time On App"], outputCol="features") #x variable features 

#You have a list of input columns and output column
#assembler makes them into two vectors

In [None]:
output = assembler.transform(data)
output.printSchema()
output.head(1)

In [None]:
final_data = output.select("features", "Yearly Amount Spent")

In [None]:
train_data, test_data = final_data.randomSplit([.75, .25]) #tuple unpacking 

In [None]:
lr = LinearRegression(labelCol="Yearly Amount Spent")

In [None]:
lr_model = lr.fit(train_data)

In [None]:
test_results = lr_model.evaluate(test_data)

In [None]:
test_results.residuals.show()

In [None]:
test_results.rootMeanSquaredError
test_results.r2

In [None]:
#Example with no y variable data
unlabeled_data = test_data.select("features")


In [None]:
predictions = lr_model.transform(unlabeled_data)
predictions.show()

#### Logistic Regression 

In [None]:
from pyspark.ml.classification import LogisticRegression
data = spark.read.format("libsvm").load("file name")

In [None]:
model = LogisticRegression()

In [None]:
fit_model = model.fit(data)

In [None]:
model_summary = fit_model.summary #does label match actual prediction?

In [None]:
model_summary.predictions.show()

#### Logistic Regression with TTS

In [None]:
lr_train, lr_test = data.randomSplit([.7, .3])

In [None]:
new_model = LogisticRegression()

In [None]:
new_model.fit(lr_train)

In [None]:
prediction_and_labels = new_model.evaluate(lr_test)

In [None]:
#Shortcut
prediction_and_labels.predictions.show()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MultiClassClassificationEvaluator
#metricName

In [None]:
my_eval = BinaryClassificationEvaluator()

In [None]:
final_roc_results = my_eval.evaluate(prediction_and_labels.predictions)

In [None]:
final_roc_results

#### Logistic Regression: Realistic Example 

In [None]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

In [None]:
gender_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
gender_encoder = OneHotEncoder(inputCol="SexIndex", outputCol="SexVec")

#essentially dummifying categorical variables

In [None]:
#assembling them (setting up our feature array)
assembler = VectorAssembler(inputCols=["Pclass", "SexVec", "EmbarkVec", "Age"], outputCol="features")

In [None]:
lr_titanic = LogisticRegression(featuresCol="features", labelCol="Survived")

In [None]:
pipeline = Pipeline(stages=[gender_indexer,embark_indexer, gender_encoder, embar_encoder, assembler, lr_titanic])
#Not sure why we are using a pipeline 

In [None]:
train_data, test_data = my_final_data.randomSplit([.7, .3])

In [None]:
fit_model = pipeline.fit(train_data)

In [None]:
results = fit_model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator 

In [None]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="Survived")

In [None]:
results.select("Survived", "prediction").show()

In [None]:
AUC = my_eval.evaluate(results)

####  Tree Methods (Decision, Random Forest, Gradient Boosting)

In [None]:
from pyspark.ml import Pipeline

In [None]:
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, DecisionTreeClassifier

#For regression
# from pyspark.ml.regression import RandomForestRegressor

In [None]:
#Same as up above

In [None]:
from pyspark.ml.evaluation import MultiClassClassificationEvaluator

In [None]:
rfc_model.featureImportances

#### K-Means Clustering (Clustering - Unsupervised Learning)

In [None]:
from pyspark.ml.clustering import KMeans

In [None]:
kmeans  = KMeans().setK(2).setSeed(1)

In [None]:
wssse = model.computeCost(final_data) #No TTS - since no labels
#within set sum square errors

In [None]:
print(wssse)

In [None]:
centers = model.clusterCenters()
#To find specific location of centroids

In [None]:
results = model.transform(final_data) 

In [None]:
results.show()