In [30]:
# libraries
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Saving and Loading a SparkML Model").getOrCreate()

In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv

In [9]:
# Load the csv into a spark dataframe
df = spark.read.csv('searchterms.csv', header=True, inferSchema=True)   

In [13]:
# Number of rows 
df.count()

10000

In [14]:
# Number of columns
len(df.columns)

4

In [15]:
df.show(5)

+---+-----+----+--------------+
|day|month|year|    searchterm|
+---+-----+----+--------------+
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021| mobile latest|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|laptop 14 inch|
| 12|   11|2021|     mobile 5g|
+---+-----+----+--------------+
only showing top 5 rows



In [22]:
df.schema['searchterm'].dataType

StringType()

In [24]:
# Create a view to find the times 'gaming laptop' was searched
df.createOrReplaceTempView('df_view')

spark.sql("SELECT COUNT(*) as search_times FROM df_view WHERE searchterm = 'gaming laptop'").show()

+------------+
|search_times|
+------------+
|         499|
+------------+



In [26]:
# Top 5 most frequently used search terms
spark.sql("SELECT searchterm, COUNT(*) as times FROM df_view GROUP BY searchterm ORDER BY times DESC LIMIT 5").show()

+-------------+-----+
|   searchterm|times|
+-------------+-----+
|mobile 6 inch| 2312|
|    mobile 5g| 2301|
|mobile latest| 1327|
|       laptop|  935|
|  tablet wifi|  896|
+-------------+-----+



In [28]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz
!tar -xvzf model.tar.gz

--2024-05-14 10:42:51--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1490 (1.5K) [application/x-tar]
Saving to: ‘model.tar.gz.1’


2024-05-14 10:42:52 (38.5 MB/s) - ‘model.tar.gz.1’ saved [1490/1490]

sales_prediction.model/
sales_prediction.model/metadata/
sales_prediction.model/metadata/part-00000
sales_prediction.model/metadata/.part-00000.crc
sales_prediction.model/metadata/_SUCCESS
sales_prediction.model/metadata/._SUCCESS.crc
sales_prediction.model/data/
sales_prediction.model/data/part-00000-1db9fe2f-4d93-4b1f-966b-3b09e72d664e-c00

In [62]:
# ML libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegressionModel
from pyspark.sql.functions import round

In [63]:
sales_prediction_model = LinearRegressionModel.load('sales_prediction.model')

In [64]:
sales_prediction_parquet = 'sales_prediction.model/data/part-00000-1db9fe2f-4d93-4b1f-966b-3b09e72d664e-c000.snappy.parquet'
sales_prediction_df = spark.read.parquet(sales_prediction_parquet)
sales_prediction_df.printSchema()

root
 |-- intercept: double (nullable = true)
 |-- coefficients: vector (nullable = true)
 |-- scale: double (nullable = true)



In [65]:
# This function predicts sales for a specific year
# First we create the vector assembler with the variable year, second create a df to be able to transform it to keep only
# 2 columns features(the vector) and the sales, with this df__ we can make predictions based on the year we want

def predict(year):
    assembler = VectorAssembler(inputCols=["year"],outputCol="features")
    data = [[year,0]]
    columns = ["year", "sales"]
    df_ = spark.createDataFrame(data, columns)
    df__ = assembler.transform(df_).select('features','sales')
    predictions = sales_prediction_model.transform(df__)
    predictions = predictions.withColumn('rounded_prediction', round(predictions['prediction'], 3))
    predictions.select('rounded_prediction').show()


In [66]:
predict(2023)

+------------------+
|rounded_prediction|
+------------------+
|           175.166|
+------------------+

