In [82]:
# !pip install pyspark
import pandas as pd

In [83]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

#### Starting Spark Session

In [84]:
spark = SparkSession.builder \
        .appName('Practice') \
        .getOrCreate()

In [85]:
spark

#### Checking Datatypes

In [89]:
type(df)

pyspark.sql.dataframe.DataFrame

In [90]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Price in US Dollars: string (nullable = true)



In [91]:
df = spark.read.option('header','true') \
                .option('dateFormat', 'M/d/y') \
                .option('inferSchema', 'true') \
                .csv('bigmac.csv')
                
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Price in US Dollars: double (nullable = true)



#### Basic Commands

In [92]:
df.columns

['Date', 'Country', 'Price in US Dollars']

In [93]:
df.select('Country')

DataFrame[Country: string]

In [94]:
df.select(['Country','Price in US Dollars']).show(5)

+---------+-------------------+
|  Country|Price in US Dollars|
+---------+-------------------+
|Argentina|               2.39|
|Australia|               3.74|
|   Brazil|               3.35|
|  Britain|               4.22|
|   Canada|               4.14|
+---------+-------------------+
only showing top 5 rows



In [95]:
df.dtypes

[('Date', 'string'), ('Country', 'string'), ('Price in US Dollars', 'double')]

In [96]:
df.describe().show()

+-------+------+---------+-------------------+
|summary|  Date|  Country|Price in US Dollars|
+-------+------+---------+-------------------+
|  count|   652|      652|                652|
|   mean|  null|     null|  3.742852760736198|
| stddev|  null|     null| 1.2885335583427657|
|    min|1/2010|Argentina|               0.66|
|    max|7/2015|  Vietnam|               9.08|
+-------+------+---------+-------------------+



In [97]:
#adding columns
df.withColumn('Price (US)', df['Price in US Dollars']).show(5)

+------+---------+-------------------+----------+
|  Date|  Country|Price in US Dollars|Price (US)|
+------+---------+-------------------+----------+
|1/2016|Argentina|               2.39|      2.39|
|1/2016|Australia|               3.74|      3.74|
|1/2016|   Brazil|               3.35|      3.35|
|1/2016|  Britain|               4.22|      4.22|
|1/2016|   Canada|               4.14|      4.14|
+------+---------+-------------------+----------+
only showing top 5 rows



In [98]:
#dropping columns
df.drop('Price (US)').show(5)

+------+---------+-------------------+
|  Date|  Country|Price in US Dollars|
+------+---------+-------------------+
|1/2016|Argentina|               2.39|
|1/2016|Australia|               3.74|
|1/2016|   Brazil|               3.35|
|1/2016|  Britain|               4.22|
|1/2016|   Canada|               4.14|
+------+---------+-------------------+
only showing top 5 rows



In [99]:
#renaming columns
df.withColumnRenamed('Country','Countries').show(5)

+------+---------+-------------------+
|  Date|Countries|Price in US Dollars|
+------+---------+-------------------+
|1/2016|Argentina|               2.39|
|1/2016|Australia|               3.74|
|1/2016|   Brazil|               3.35|
|1/2016|  Britain|               4.22|
|1/2016|   Canada|               4.14|
+------+---------+-------------------+
only showing top 5 rows



#### Groupby Aggregate Operations

In [103]:
test = df.groupBy('Country').sum()
test.withColumnRenamed('sum(Price in US Dollars)','sum') \
    .sort('sum', ascending = False).show(5)

+-----------+------------------+
|    Country|               sum|
+-----------+------------------+
|     Norway| 84.44999999999999|
|Switzerland|             82.53|
|     Sweden|             71.47|
|    Denmark|             61.59|
|     Brazil|61.319999999999986|
+-----------+------------------+
only showing top 5 rows



### SPARK SQL

In [104]:
#register it as a temp sql table
df.createOrReplaceTempView("my_spark_table")

In [105]:
spark.sql("""
SELECT Country, SUM(`Price in US Dollars`) as sum
FROM my_spark_table
GROUP BY 1
ORDER BY 2 DESC
""").show(5)

+-----------+------------------+
|    Country|               sum|
+-----------+------------------+
|     Norway| 84.44999999999999|
|Switzerland|             82.53|
|     Sweden|             71.47|
|    Denmark|             61.59|
|     Brazil|61.319999999999986|
+-----------+------------------+
only showing top 5 rows



### Spark DataFrame Basics

In [106]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [107]:
df = spark.read.json('Python-and-Spark-for-Big-Data-master\Spark_DataFrames\people.json')

In [108]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### Data Types

In [109]:
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)

In [110]:
data_schema = [StructField('age', IntegerType(), True),
               StructField('name', StringType(), True)]

In [111]:
final_struc = StructType(fields=data_schema)

In [112]:
#JSON files cannot infer schema
df = spark.read.json('Python-and-Spark-for-Big-Data-master\Spark_DataFrames\people.json' \
                            , schema=final_struc)

In [113]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



### Selecting/Filter Columns

In [114]:
df = spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv', inferSchema=True, header=True)

In [115]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [116]:
# df.filter('Close < 500').select(['Open','Close']).show(3)

#recommended way of filtering using python syntax
df.filter(df['Close'] < 500).select(['Volume','Open']).show(3)

# df.filter((df['Open'] > 200) & (df['Close'] < 500)).show(3)

+---------+----------+
|   Volume|      Open|
+---------+----------+
|123432400|213.429998|
|150476200|214.599998|
|138040000|214.379993|
+---------+----------+
only showing top 3 rows



In [117]:
#get back output as a row object
result = df.filter(df['Low'] == 197.16).collect()

In [118]:
result

[Row(Date=datetime.date(2010, 1, 22), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [119]:
row = result[0]

In [120]:
row.asDict()

{'Date': datetime.date(2010, 1, 22),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [121]:
row.asDict()['Volume']

220441900

### Group By and Aggregate Functions

In [122]:
spark = SparkSession.builder.appName('aggs').getOrCreate()
df = spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/sales_info.csv', inferSchema=True, header=True)

In [123]:
df.show(5)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
+-------+-------+-----+
only showing top 5 rows



In [124]:
#sum, count, etc...
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [125]:
#agg method
df.agg({'Sales':'sum'}).show()
df.agg({'Sales':'max'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [126]:
from pyspark.sql.functions import *

### Alias and Formating Numbers

In [127]:
df.select(countDistinct('Sales')).show()
df.select(avg('Sales').alias('Average sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+

+-----------------+
|    Average sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [128]:
avg_sales = df.select(avg('Sales').alias('Average sales'))
avg_sales.select(format_number('Average sales',2).alias('Average sales')).show()

+-------------+
|Average sales|
+-------------+
|       360.58|
+-------------+



In [129]:
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [130]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



### Missing Data

In [131]:
spark = SparkSession.builder.appName('miss').getOrCreate()
df = spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/ContainsNull.csv', inferSchema=True, header=True)

In [132]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [134]:
#drop if there is 2 or more null values in the row
df.na.drop(how = 'any',thresh =2).show(5) 

#drop if the row is entirely null
df.na.drop(how = 'all').show(5)

#drop if there is 1 or more null values in the column
df.na.drop(how = 'any', subset = ['Sales']).show(5)

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [143]:
#fill missing values
df.na.fill(0, subset = ['Sales']).show(5)

#fill null sales column with mean sales
df.na.fill(df.select(mean(df['sales'])).collect()[0][0], subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [142]:
df.na.fill('No Name', subset = ['Name']).show(5)

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



### Dates and Timestamps

In [144]:
spark = SparkSession.builder.appName('miss').getOrCreate()
df = spark.read.csv('Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv', inferSchema=True, header=True)

In [147]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [150]:
df.select(['Date','Open']).show(3)

+----------+----------+
|      Date|      Open|
+----------+----------+
|2010-01-04|213.429998|
|2010-01-05|214.599998|
|2010-01-06|214.379993|
+----------+----------+
only showing top 3 rows



In [155]:
#dayofmonth
df.select(dayofmonth(df['Date'])).show(5)

#hour
df.select(hour(df['Date'])).show(5)  

#month
df.select(month(df['Date'])).show(5)  

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
+----------------+
only showing top 5 rows

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 5 rows

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 5 rows



In [185]:
# df.select(year(df['Date'])).show()

#average closing price per year ordered by year

#add new column, extract Year
newdf = df.withColumn('Year',year(df['Date']))  

#group by Year, get the average closing price, ordered by year
result = newdf.groupBy('year').mean().select(['Year','avg(Close)']).orderBy('Year')

#format number, round to 2dp | use alias to rename
result = result.select(['Year',format_number('avg(close)',2).alias('Average Closing Price')])

result.show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2010|               259.84|
|2011|               364.00|
|2012|               576.05|
|2013|               472.63|
|2014|               295.40|
|2015|               120.04|
|2016|               104.60|
+----+---------------------+



## Machine Learning with Spark

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# StringIndexer is used to convert categorical string values into numerical indices.  eg female - 1, male - 0
indexer = StringIndexer(
    inputCols=["Gender", "Smoker"],
    outputCols=["GenderIndex", "SmokerIndex"],
)
df_r = indexer.fit(df).transform(df)
df_r.show()

# OneHotEncoder is used to convert the indexed values into one-hot encoded vectors, ensuring that the algorithm treats each category as distinct and non-ordinal.
gender_encoder = OneHotEncoder(inputCol="GenderIndex", outputCol="GenderVec")


# Combine columns [Age, Experience] together ---> Independent Feature
feature_assembler = VectorAssembler(
    inputCols=["Age", "GenderVec"], outputCol="Independent Feature"
)
output = feature_assembler.transform(dataset)

In [None]:
# Choose column to predict eg. Salary with the independent column
finalized_data = output.select("Independent Feature", "Salary")

### Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression 

# train test split
train, test = finalized_data.randomSplit([0.75,0.25])

# train data
regressor = LinearRegression(featuresCol="Independent Features", labelCol="Salary")
regressor = regressor.fit(train)

In [None]:
# Coefficients
regressor.coefficients

In [None]:
# Intercepts
regressor.intercept

In [None]:
# Prediction for test data
pred_results = regressor.evaluate(test)
pred_results.predictions.show()

### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

### Decision Trees, Random Forests

### K-means Clustering

### Collaborative filtering for Recommender Systems

### Natural Language Processing NLP