### Analysing search terms on an e-commerce web server


In [1]:
# Install spark
!pip install pyspark
!pip install findspark



In [2]:
# Start session
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Capstone SparkML lab").getOrCreate()

23/06/28 15:10:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/28 15:10:47 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [23]:
%%bash
# Get data
wget 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv'


--2023-06-28 15:31:40--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 233457 (228K) [text/csv]
Saving to: ‘searchterms.csv.2’

     0K .......... .......... .......... .......... .......... 21% 26.1M 0s
    50K .......... .......... .......... .......... .......... 43% 32.7M 0s
   100K .......... .......... .......... .......... .......... 65% 38.3M 0s
   150K .......... .......... .......... .......... .......... 87% 40.9M 0s
   200K .......... .......... .......                         100% 28.8M=0.007s

2023-06-28 15:31:40 (32.8 MB/s

In [30]:
# Load the csv into a spark dataframe
import os
data_csv = os.environ.get('searchterms', 'searchterms.csv')
df = spark.read.csv(data_csv, header=True)
df.createOrReplaceTempView('df')

In [35]:
# Print the number of rows and columns
print(f'rows = {df.count()}, columns = {len(df.columns)}')

rows = 10000, columns = 4


In [50]:
# Print the top 5 rows
df.show(5)

+---+-----+----+--------------+
|day|month|year|    searchterm|
+---+-----+----+--------------+
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021| mobile latest|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|laptop 14 inch|
| 12|   11|2021|     mobile 5g|
+---+-----+----+--------------+
only showing top 5 rows



In [None]:
# Find out the datatype of the column searchterm?

In [49]:
df.dtypes[3]

('searchterm', 'string')

In [None]:
# How many times was the term `gaming laptop` searched?

In [59]:
df.where(df.searchterm == 'gaming laptop').count()

499

In [None]:
# Print the top 5 most frequently used search terms?

In [68]:
def top_terms(df=df, col='searchterm', top=5):
    l = df.groupBy('searchterm').count().collect()
    [print(l[x]) for x in range(top)]
top_terms()



Row(searchterm='mobile 5g', count=2301)
Row(searchterm='ebooks data science', count=410)
Row(searchterm='mobile 6 inch', count=2312)
Row(searchterm='tablet 10 inch', count=715)
Row(searchterm='laptop', count=935)


                                                                                

In [69]:
%%bash
# Get pretrained sales forecasting model
wget  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz
tar -xzf model.tar.gz 

--2023-06-28 16:13:31--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1490 (1.5K) [application/x-tar]
Saving to: ‘model.tar.gz’

     0K .                                                     100% 11.3M=0s

2023-06-28 16:13:32 (11.3 MB/s) - ‘model.tar.gz’ saved [1490/1490]



In [84]:
# Load the sales forecast model.
from pyspark.ml.regression import LinearRegressionModel
model = LinearRegressionModel.load('sales_prediction.model')
from pyspark.ml.feature import VectorAssembler

In [85]:
# Using the sales forecast model, predict the sales for the year of 2023.

In [86]:
model.explainParams()

'aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)\nelasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)\nepsilon: The shape parameter to control the amount of robustness. Must be > 1.0. (default: 1.35)\nfeaturesCol: features column name (current: features)\nfitIntercept: whether to fit an intercept term (default: True)\nlabelCol: label column name (current: sales)\nloss: The loss function to be optimized. Supported options: squaredError, huber. (Default squaredError) (default: squaredError)\nmaxIter: maximum number of iterations (>= 0) (current: 100)\npredictionCol: prediction column name (default: prediction)\nregParam: regularization parameter (>= 0) (current: 0.1)\nsolver: The solver algorithm for optimization. Supported options: auto, normal, l-bfgs. (Default auto) (default: auto)\nstandardization: whether to standardize the training features before fi

In [87]:
df2 = spark.read.parquet('./sales_prediction.model/data/part-00000-1db9fe2f-4d93-4b1f-966b-3b09e72d664e-c000.snappy.parquet')
df2.show()

+-------------------+-------------------+-----+
|          intercept|       coefficients|scale|
+-------------------+-------------------+-----+
|-13019.989140447298|[6.522567861288859]|  1.0|
+-------------------+-------------------+-----+



In [88]:
def predict(year):
    assembler = VectorAssembler(inputCols=["year"],outputCol="features")
    data = [[year,0]]
    columns = ["year", "sales"]
    _ = spark.createDataFrame(data, columns)
    __ = assembler.transform(_).select('features','sales')
    predictions = model.transform(__)
    predictions.select('prediction').show()

In [89]:
predict(2023)

                                                                                

+------------------+
|        prediction|
+------------------+
|175.16564294006457|
+------------------+



23/06/28 16:39:51 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/06/28 16:39:51 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
