# Senior ML Engineer Assignment | Daniel Helfman

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = (
    SparkSession
        .builder
        .appName("programming")
        .master("local")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config('spark.ui.port', '4050')
        .getOrCreate()
)

In [None]:
orders_path = '/content/drive/MyDrive/Colab Notebooks/MMM_DEVELOPMENT/ANALYTICS_PROJECTS/SONY-INTERVIEW-ASSIGNMENT/2_intermediate_exercise/resources/orders.csv'
orders_df = spark.read.option("header", True).option("inferSchema", True).csv(orders_path)

orders_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- total_price: double (nullable = true)



In [None]:
orders_df = orders_df.withColumnRenamed("id","order_id")
orders_df.show()

+--------+--------+----------+-----------+-----------+
|order_id|store_id|      date|customer_id|total_price|
+--------+--------+----------+-----------+-----------+
|  100001|    3001|2021-06-16|     200001|      15.95|
|  100002|    3001|2021-06-16|     200002|      31.24|
|  100003|    3001|2021-06-16|     200003|       2.13|
|  100004|    3001|2021-06-17|     200005|      12.09|
|  100005|    3001|2021-06-17|     200002|      38.52|
|  100006|    3001|2021-06-17|     200002|      38.52|
|  100007|    3001|2021-06-17|     200007|        1.4|
|  100008|    3001|2021-06-18|     200004|       91.2|
|  100009|    3001|2021-06-18|     200004|       3.45|
|  100010|    3001|2021-06-18|     200002|      18.71|
|  100011|    3001|2021-06-18|     200007|      12.11|
+--------+--------+----------+-----------+-----------+



In [None]:
customers_path = '/content/drive/MyDrive/Colab Notebooks/MMM_DEVELOPMENT/ANALYTICS_PROJECTS/SONY-INTERVIEW-ASSIGNMENT/2_intermediate_exercise/resources/customers.csv'
customers_df = spark.read.option("header", True).option("inferSchema", True).csv(customers_path)
customers_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- reward_card_num: integer (nullable = true)
 |-- joined: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- zipcode: string (nullable = true)



In [None]:
customers_df.show()

+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|
+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|200001|Matlalihuitl|    Rackett |         111111|2019-11-18|123 Main|   WA|     US|     98101|
|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|200003|    Carmello|       Wygal|         333333|2021-03-05|923 Main|   WA|     us|     98101|
|200004|      Yazmin|    Pozzuoli|         444444|2015-08-19|100 Main|   WA|     US|98101-1234|
|200005|        Cate|       Hinde|         555555|2020-12-31|444 Main|   WA|     US|     98101|
|200006|      Astera|   Loeschner|         666666|2021-07-12|    null|   AZ|     US|     85001|
|200007|       Maeja|       Kempf|         777777| 2017-11-1|888 Mani|   MB|     CA|   R0G 0J0|
+------+------------+------------+------

In [None]:
joined_df = orders_df.join(customers_df, orders_df.customer_id == customers_df.id, 'inner')
joined_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- total_price: double (nullable = true)
 |-- id: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- reward_card_num: integer (nullable = true)
 |-- joined: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- zipcode: string (nullable = true)



In [None]:
joined_df.cache()
joined_df.show()

+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|order_id|store_id|      date|customer_id|total_price|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|
+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|  100001|    3001|2021-06-16|     200001|      15.95|200001|Matlalihuitl|    Rackett |         111111|2019-11-18|123 Main|   WA|     US|     98101|
|  100010|    3001|2021-06-18|     200002|      18.71|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100006|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100005|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|200

In [None]:
joined_df.createOrReplaceTempView('joinedView')

**1. Show total orders and revenue.**

In [None]:
total_orders_and_revenue_df = spark.sql('SELECT customer_id, COUNT(order_id) AS total_orders, SUM(total_price) AS total_revenue FROM joinedView GROUP BY customer_id ORDER BY total_revenue DESC')
total_orders_and_revenue_df.show()

+-----------+------------+-------------+
|customer_id|total_orders|total_revenue|
+-----------+------------+-------------+
|     200002|           4|       126.99|
|     200004|           2|        94.65|
|     200001|           1|        15.95|
|     200007|           2|        13.51|
|     200005|           1|        12.09|
|     200003|           1|         2.13|
+-----------+------------+-------------+



**2. What's the average order revenue per day?**

In [None]:
average_order_revenue_df = spark.sql('SELECT date, AVG(total_price) AS avg_order_revenue FROM joinedView GROUP BY date ORDER BY date DESC')
average_order_revenue_df.show()

+----------+------------------+
|      date| avg_order_revenue|
+----------+------------------+
|2021-06-18|           31.3675|
|2021-06-17|22.632500000000004|
|2021-06-16|             16.44|
+----------+------------------+



**3. Which order has the most revenue each day?**

In [None]:
revenue_by_day_df = spark.sql("SELECT date, order_id, total_price, MAX(total_price) OVER (PARTITION BY date) AS max_revenue_by_date FROM joinedView ORDER BY max_revenue_by_date DESC")
revenue_by_day_df.show()

+----------+--------+-----------+-------------------+
|      date|order_id|total_price|max_revenue_by_date|
+----------+--------+-----------+-------------------+
|2021-06-18|  100010|      18.71|               91.2|
|2021-06-18|  100009|       3.45|               91.2|
|2021-06-18|  100008|       91.2|               91.2|
|2021-06-18|  100011|      12.11|               91.2|
|2021-06-17|  100006|      38.52|              38.52|
|2021-06-17|  100007|        1.4|              38.52|
|2021-06-17|  100005|      38.52|              38.52|
|2021-06-17|  100004|      12.09|              38.52|
|2021-06-16|  100001|      15.95|              31.24|
|2021-06-16|  100002|      31.24|              31.24|
|2021-06-16|  100003|       2.13|              31.24|
+----------+--------+-----------+-------------------+



In [None]:
joined_df.show()

+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|order_id|store_id|      date|customer_id|total_price|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|
+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|  100001|    3001|2021-06-16|     200001|      15.95|200001|Matlalihuitl|    Rackett |         111111|2019-11-18|123 Main|   WA|     US|     98101|
|  100010|    3001|2021-06-18|     200002|      18.71|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100006|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100005|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|200

**4. How much revenue by customer zip code?**

In [None]:
joined_df.show()

+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|order_id|store_id|      date|customer_id|total_price|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|
+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|  100001|    3001|2021-06-16|     200001|      15.95|200001|Matlalihuitl|    Rackett |         111111|2019-11-18|123 Main|   WA|     US|     98101|
|  100010|    3001|2021-06-18|     200002|      18.71|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100006|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100005|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|200

In [None]:
revenue_by_day_df = spark.sql("SELECT zipcode, SUM(total_price) AS total_revenue_by_zipcode FROM joinedView GROUP BY zipcode ORDER BY total_revenue_by_zipcode DESC")
revenue_by_day_df.show()

+----------+------------------------+
|   zipcode|total_revenue_by_zipcode|
+----------+------------------------+
|     98101|      157.16000000000003|
|98101-1234|                   94.65|
|   R0G 0J0|                   13.51|
+----------+------------------------+



**5. Who has been a customer the longest?**

In [None]:
total_orders_and_revenue_df = spark.sql('SELECT * FROM joinedView ORDER BY joined ASC')
total_orders_and_revenue_df.show()

+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|order_id|store_id|      date|customer_id|total_price|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|
+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|  100005|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100002|    3001|2021-06-16|     200002|      31.24|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100010|    3001|2021-06-18|     200002|      18.71|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100006|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|200

**6. Which customer has the least amount of orders?**

In [None]:
revenue_by_day_df = spark.sql("SELECT customer_id, COUNT(order_id) AS total_customer_orders FROM joinedView GROUP BY customer_id ORDER BY total_customer_orders ASC")
revenue_by_day_df.show()

+-----------+---------------------+
|customer_id|total_customer_orders|
+-----------+---------------------+
|     200001|                    1|
|     200005|                    1|
|     200003|                    1|
|     200007|                    2|
|     200004|                    2|
|     200002|                    4|
+-----------+---------------------+



In [None]:
revenue_by_day_df = spark.sql("SELECT customer_id, fname, lname FROM joinedView WHERE customer_id IN ('200001', '200005', '200003') ORDER BY customer_id DESC")
revenue_by_day_df.show()

+-----------+------------+--------+
|customer_id|       fname|   lname|
+-----------+------------+--------+
|     200005|        Cate|   Hinde|
|     200003|    Carmello|   Wygal|
|     200001|Matlalihuitl|Rackett |
+-----------+------------+--------+



**7. BONUS PySpark Machine Learning Examples**

In [None]:
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import BisectingKMeans

from pyspark.ml.feature import StringIndexer

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
joined_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- total_price: double (nullable = true)
 |-- id: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- reward_card_num: integer (nullable = true)
 |-- joined: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- zipcode: string (nullable = true)



In [None]:
joined_df.describe().show()

+-------+-----------------+--------+----------+------------------+------------------+------------------+--------+------------+------------------+----------+--------+-----+-------+-------+
|summary|         order_id|store_id|      date|       customer_id|       total_price|                id|   fname|       lname|   reward_card_num|    joined|address1|state|country|zipcode|
+-------+-----------------+--------+----------+------------------+------------------+------------------+--------+------------+------------------+----------+--------+-----+-------+-------+
|  count|               11|      11|        11|                11|                11|                11|      11|          11|                11|        11|      11|   11|     11|     11|
|   mean|         100006.0|  3001.0|      null|200003.54545454544|             24.12|200003.54545454544|    null|        null|          393939.0|      null|    null| null|   null|98101.0|
| stddev|3.316624790356683|     0.0|      null| 2.0670576365

In [None]:
joined_df.show()

+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|order_id|store_id|      date|customer_id|total_price|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|
+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|  100001|    3001|2021-06-16|     200001|      15.95|200001|Matlalihuitl|    Rackett |         111111|2019-11-18|123 Main|   WA|     US|     98101|
|  100010|    3001|2021-06-18|     200002|      18.71|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100006|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100005|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|200

In [None]:
# input all the features in one vector column
assembler = VectorAssembler(inputCols=['order_id', 'store_id', 'customer_id', 'id', 'reward_card_num'], outputCol = 'features')
output = assembler.transform(joined_df)

finalized_data = output.select("features", "total_price")
finalized_data.show()

+--------------------+-----------+
|            features|total_price|
+--------------------+-----------+
|[100001.0,3001.0,...|      15.95|
|[100010.0,3001.0,...|      18.71|
|[100006.0,3001.0,...|      38.52|
|[100005.0,3001.0,...|      38.52|
|[100002.0,3001.0,...|      31.24|
|[100003.0,3001.0,...|       2.13|
|[100009.0,3001.0,...|       3.45|
|[100008.0,3001.0,...|       91.2|
|[100004.0,3001.0,...|      12.09|
|[100011.0,3001.0,...|      12.11|
|[100007.0,3001.0,...|        1.4|
+--------------------+-----------+



**KMeans Clustering Unsupervised Machine Learning**

In [None]:
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(finalized_data)
centers = kmodel.clusterCenters()

# this will print out the 3 sets of clusters for your dataset
centers

[array([100009.,   3001., 200007., 200007., 777777.]),
 array([100004.5,   3001. , 200002. , 200002. , 222222. ]),
 array([100007.        ,   3001.        , 200004.33333333, 200004.33333333,
        481481.        ])]

**Hierarchical Clustering Unsupervised Machine Learning**

In [None]:
BKMeans = BisectingKMeans().setK(3)
BKMeans = BKMeans.setSeed(1)
BKModel = BKMeans.fit(finalized_data)
BKCenters = BKModel.clusterCenters()

# hierarchical clustering results
BKCenters

[array([100001.,   3001., 200001., 200001., 111111.]),
 array([100005.2,   3001. , 200002.2, 200002.2, 244444.2]),
 array([100007.8,   3001. , 200005.4, 200005.4, 599999.4])]

**Classification Supervised Machine Learning**

In [None]:
joined_df.show()

+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|order_id|store_id|      date|customer_id|total_price|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|
+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+
|  100001|    3001|2021-06-16|     200001|      15.95|200001|Matlalihuitl|    Rackett |         111111|2019-11-18|123 Main|   WA|     US|     98101|
|  100010|    3001|2021-06-18|     200002|      18.71|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100006|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|
|  100005|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|200

In [None]:
indexer = StringIndexer(inputCol='zipcode', outputCol='label')
joined_df_classification = indexer.fit(joined_df).transform(joined_df)
joined_df_classification.show()

+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+-----+
|order_id|store_id|      date|customer_id|total_price|    id|       fname|       lname|reward_card_num|    joined|address1|state|country|   zipcode|label|
+--------+--------+----------+-----------+-----------+------+------------+------------+---------------+----------+--------+-----+-------+----------+-----+
|  100001|    3001|2021-06-16|     200001|      15.95|200001|Matlalihuitl|    Rackett |         111111|2019-11-18|123 Main|   WA|     US|     98101|  0.0|
|  100010|    3001|2021-06-18|     200002|      18.71|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|  0.0|
|  100006|    3001|2021-06-17|     200002|      38.52|200002|     Nicolas|Belarde Ross|         222222|2001-01-01|125 Main|   WA|     US|     98101|  0.0|
|  100005|    3001|2021-06-17|     200002|      38.52|200002|     Nico

**Regression Supervised Machine Learning**

In [None]:
train_data, test_data = finalized_data.randomSplit([0.8,0.2])
regressor = LinearRegression(featuresCol = 'features', labelCol = 'total_price')

# fit the model from training set
regressor = regressor.fit(train_data)

In [None]:
# predict the total_price on testing set
pred = regressor.evaluate(test_data)

# predict the model
pred.predictions.show()

+--------------------+-----------+------------------+
|            features|total_price|        prediction|
+--------------------+-----------+------------------+
|[100007.0,3001.0,...|        1.4|15.392189735721331|
+--------------------+-----------+------------------+



In [None]:
# coefficient of the regression model
coeff = regressor.coefficients

# X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([1.7468, 0.0, -1.1334, -1.1334, -0.0])
The Intercept of the model is : 278727.722097


In [None]:
eval = RegressionEvaluator(labelCol="total_price", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 13.992
MSE: 195.781
MAE: 13.992
r2: -inf
