
## Assignment

### Data
The data that you need for this task can be downloaded here: 
https://drive.google.com/file/d/1FD7tK1mKW5tRKgoyDGj_VZeK0OnB4GeV/view?usp=drive_link

The CSV holds transaction data that represents the service bookings of a specific game between from 2018 to 2022. For data protection reasons, the data is not real data. However, the structure of the data is based on real data.

### Business goal of this task
The overall goal is to analyze a time series of sales data and predict future sales.

### Tasks
●	Get familiar with the transaction data and analyze it

●	Create a forecast model for the sales 2023

●	Describe the steps you are taking and why you are taking them

●	Describe possible influencing factors on sales and the forecast

●	Describe your ideas on how to improve the quality of forecast

●	Optional: create a Sales Dashboard in Tableau and Visualize your analysis findings and sales predictions


### Results and Conclusion
The Analysis is done to create a forecasting model in PySpark to predict the service runtime amount for the independent features like 'Year', 'Month_Indexed', 'Day_Indexed','transaction_country_cd_Indexed','datacenter_country_cd_Indexed' for the year 2023 which were considered to be the influencing factors for the forecast. Linear Regression was used for the analysis since a the data was in a structured format with the prediction needed on one dependent variable which was service runtime based on a number of independent variables and would usually work well for this analysis.

A random split of 75-25% was done for the training and testing data.

The forecasting could be improved using other Unsupervised ML methods like Clustering and be evaluated based on the accuracy of the model.

### Visualization
Tableau results: https://public.tableau.com/views/ForecastingServerRuntime-2/Dashboard1?:language=en-US&publish=yes&:display_count=n&:origin=viz_share_link

The above analysis shows the service runtime usage monthly, quarterly and yearly and forecasting of the service runtime performed on monthly and quarterly usages for the years from 2018 to 2025.

In [0]:
# File location and type
file_location = "/FileStore/tables/data_ds_challenge.csv"
file_type = "csv"

# Read my data
df_pyspark=spark.read.csv(file_location,header=True,inferSchema=True)
df_pyspark.show()
df_pyspark.printSchema() 


+--------------------+-------------------+----------------------+-----------------------+--------------------+-----------------------+---------------------+--------------------+--------------------------+-----------------+----------------+
|      transaction_cd|     transaction_ts|transaction_country_cd|transaction_currency_cd|             user_cd|user_profile_country_cd|datacenter_country_cd|          service_cd|service_runtime_amount_sec|service_slot_size|service_ram_size|
+--------------------+-------------------+----------------------+-----------------------+--------------------+-----------------------+---------------------+--------------------+--------------------------+-----------------+----------------+
|cd1e056a1b1ec8abc...|2018-01-01 02:35:29|                    DE|                    EUR|995db235689f65853...|                     DE|                   DE|3a3d5c19863eeb78d...|                   2592000|               10|            1024|
|2b53c3e03e5a760d8...|2018-01-01 02:36:0

In [0]:
df_pyspark2=df_pyspark.drop('transaction_cd','transaction_currency_cd','user_cd','user_profile_country_cd','service_cd','service_slot_size','service_ram_size')
df_pyspark2.show()

+-------------------+----------------------+---------------------+--------------------------+
|     transaction_ts|transaction_country_cd|datacenter_country_cd|service_runtime_amount_sec|
+-------------------+----------------------+---------------------+--------------------------+
|2018-01-01 02:35:29|                    DE|                   DE|                   2592000|
|2018-01-01 02:36:08|                    US|                   US|                   7776000|
|2018-01-01 03:57:15|                    US|                   US|                    259200|
|2018-01-01 10:44:08|                    US|                   US|                   2592000|
|2018-01-01 13:23:05|                    ES|                   DE|                   2592000|
|2018-01-01 14:12:41|                    FR|                   DE|                   2592000|
|2018-01-01 14:12:56|                    DE|                   DE|                   2592000|
|2018-01-01 15:15:45|                    DE|                

In [0]:
from pyspark.sql.functions import *

df_pyspark2=df_pyspark2.select(
          date_format(df_pyspark2.transaction_ts,
                      "yyyy").alias("Year"), 
          date_format(df_pyspark2.transaction_ts, 
                      "MMMM").alias("Month"), 
          date_format(df_pyspark2.transaction_ts, 
                      "dd").alias("Date"), 
          date_format(df_pyspark2.transaction_ts, 
                      "MM/dd/yyyy").alias("Date"), 
          date_format(df_pyspark2.transaction_ts, 
                      "hh:mm:ss").alias("Time"), 
          date_format(df_pyspark2.transaction_ts, 
                      "E").alias("Day"),
          df_pyspark2.transaction_country_cd,df_pyspark2.datacenter_country_cd,df_pyspark2.service_runtime_amount_sec
          )


In [0]:
df_pyspark2.show()

+----+-------+----+----------+--------+---+----------------------+---------------------+--------------------------+
|Year|  Month|Date|      Date|    Time|Day|transaction_country_cd|datacenter_country_cd|service_runtime_amount_sec|
+----+-------+----+----------+--------+---+----------------------+---------------------+--------------------------+
|2018|January|  01|01/01/2018|02:35:29|Mon|                    DE|                   DE|                   2592000|
|2018|January|  01|01/01/2018|02:36:08|Mon|                    US|                   US|                   7776000|
|2018|January|  01|01/01/2018|03:57:15|Mon|                    US|                   US|                    259200|
|2018|January|  01|01/01/2018|10:44:08|Mon|                    US|                   US|                   2592000|
|2018|January|  01|01/01/2018|01:23:05|Mon|                    ES|                   DE|                   2592000|
|2018|January|  01|01/01/2018|02:12:41|Mon|                    FR|      

In [0]:
from pyspark.ml.feature import StringIndexer

# StringIndexer Initialization
indexer = StringIndexer(inputCols=["Month", "Day", "transaction_country_cd", "datacenter_country_cd"],outputCols = ["Month_Indexed","Day_Indexed", "transaction_country_cd_Indexed", "datacenter_country_cd_Indexed"])

# Transform the DataFrame using the fitted StringIndexer model
df_indexed = indexer.fit(df_pyspark2).transform(df_pyspark2)
df_indexed.show()

+----+-------+----+----------+--------+---+----------------------+---------------------+--------------------------+-------------+-----------+------------------------------+-----------------------------+
|Year|  Month|Date|      Date|    Time|Day|transaction_country_cd|datacenter_country_cd|service_runtime_amount_sec|Month_Indexed|Day_Indexed|transaction_country_cd_Indexed|datacenter_country_cd_Indexed|
+----+-------+----+----------+--------+---+----------------------+---------------------+--------------------------+-------------+-----------+------------------------------+-----------------------------+
|2018|January|  01|01/01/2018|02:35:29|Mon|                    DE|                   DE|                   2592000|          2.0|        3.0|                           0.0|                          0.0|
|2018|January|  01|01/01/2018|02:36:08|Mon|                    US|                   US|                   7776000|          2.0|        3.0|                           1.0|                

In [0]:
df_indexed=df_indexed.drop('Date','Time','Month','Day','transaction_country_cd','datacenter_country_cd')
df_indexed.show()

+----+--------------------------+-------------+-----------+------------------------------+-----------------------------+
|Year|service_runtime_amount_sec|Month_Indexed|Day_Indexed|transaction_country_cd_Indexed|datacenter_country_cd_Indexed|
+----+--------------------------+-------------+-----------+------------------------------+-----------------------------+
|2018|                   2592000|          2.0|        3.0|                           0.0|                          0.0|
|2018|                   7776000|          2.0|        3.0|                           1.0|                          1.0|
|2018|                    259200|          2.0|        3.0|                           1.0|                          1.0|
|2018|                   2592000|          2.0|        3.0|                           1.0|                          1.0|
|2018|                   2592000|          2.0|        3.0|                           8.0|                          0.0|
|2018|                   2592000

In [0]:
from pyspark.sql.types import IntegerType
df_indexed = df_indexed.withColumn("Year", df_indexed["Year"].cast(IntegerType()))

In [0]:
df_indexed.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- service_runtime_amount_sec: integer (nullable = true)
 |-- Month_Indexed: double (nullable = false)
 |-- Day_Indexed: double (nullable = false)
 |-- transaction_country_cd_Indexed: double (nullable = false)
 |-- datacenter_country_cd_Indexed: double (nullable = false)



In [0]:
df_indexed.show()

+----+--------------------------+-------------+-----------+------------------------------+-----------------------------+
|Year|service_runtime_amount_sec|Month_Indexed|Day_Indexed|transaction_country_cd_Indexed|datacenter_country_cd_Indexed|
+----+--------------------------+-------------+-----------+------------------------------+-----------------------------+
|2018|                   2592000|          2.0|        3.0|                           0.0|                          0.0|
|2018|                   7776000|          2.0|        3.0|                           1.0|                          1.0|
|2018|                    259200|          2.0|        3.0|                           1.0|                          1.0|
|2018|                   2592000|          2.0|        3.0|                           1.0|                          1.0|
|2018|                   2592000|          2.0|        3.0|                           8.0|                          0.0|
|2018|                   2592000

In [0]:
df_indexed.columns

Out[68]: ['Year',
 'service_runtime_amount_sec',
 'Month_Indexed',
 'Day_Indexed',
 'transaction_country_cd_Indexed',
 'datacenter_country_cd_Indexed']

In [0]:
from pyspark.ml.feature import VectorAssembler 

featureassembler=VectorAssembler(inputCols=['Year', 'Month_Indexed', 'Day_Indexed','transaction_country_cd_Indexed','datacenter_country_cd_Indexed'
],outputCol="Independent Features")
output=featureassembler.transform(df_indexed)


In [0]:
output.select("Independent Features").show()

+--------------------+
|Independent Features|
+--------------------+
|[2018.0,2.0,3.0,0...|
|[2018.0,2.0,3.0,1...|
|[2018.0,2.0,3.0,1...|
|[2018.0,2.0,3.0,1...|
|[2018.0,2.0,3.0,8...|
|[2018.0,2.0,3.0,2...|
|[2018.0,2.0,3.0,0...|
|[2018.0,2.0,3.0,0...|
|[2018.0,2.0,3.0,9...|
|[2018.0,2.0,3.0,4...|
|[2018.0,2.0,3.0,0...|
|[2018.0,2.0,3.0,2...|
|[2018.0,2.0,3.0,0...|
|[2018.0,2.0,3.0,0...|
|[2018.0,2.0,3.0,9...|
|[2018.0,2.0,3.0,1...|
|[2018.0,2.0,3.0,0...|
|[2018.0,2.0,3.0,1...|
|[2018.0,2.0,3.0,1...|
|[2018.0,2.0,5.0,1...|
+--------------------+
only showing top 20 rows



In [0]:
finalized_data=output.select("Independent Features","service_runtime_amount_sec")
finalized_data.show()

+--------------------+--------------------------+
|Independent Features|service_runtime_amount_sec|
+--------------------+--------------------------+
|[2018.0,2.0,3.0,0...|                   2592000|
|[2018.0,2.0,3.0,1...|                   7776000|
|[2018.0,2.0,3.0,1...|                    259200|
|[2018.0,2.0,3.0,1...|                   2592000|
|[2018.0,2.0,3.0,8...|                   2592000|
|[2018.0,2.0,3.0,2...|                   2592000|
|[2018.0,2.0,3.0,0...|                   2592000|
|[2018.0,2.0,3.0,0...|                   2592000|
|[2018.0,2.0,3.0,9...|                   2592000|
|[2018.0,2.0,3.0,4...|                   2592000|
|[2018.0,2.0,3.0,0...|                    259200|
|[2018.0,2.0,3.0,2...|                   2592000|
|[2018.0,2.0,3.0,0...|                   2592000|
|[2018.0,2.0,3.0,0...|                   2592000|
|[2018.0,2.0,3.0,9...|                   2592000|
|[2018.0,2.0,3.0,1...|                   2592000|
|[2018.0,2.0,3.0,0...|                   2592000|


In [0]:
from pyspark.ml.regression import LinearRegression
#train test split
train_data, test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol="Independent Features",labelCol="service_runtime_amount_sec")
regressor=regressor.fit(train_data)

In [0]:
regressor.coefficients

Out[77]: DenseVector([-28032.3513, 4838.9956, 2766.2799, -5238.9879, 1302.2741])

In [0]:
regressor.intercept

Out[78]: 59665737.471472844

In [0]:
### Predictions
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show()

+--------------------+--------------------------+------------------+
|Independent Features|service_runtime_amount_sec|        prediction|
+--------------------+--------------------------+------------------+
|    (5,[0],[2018.0])|                   2592000|    3096452.620701|
|    (5,[0],[2018.0])|                   2592000|    3096452.620701|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2019.0])|                   2592000|3068420.2694369927|
|    (5,[0],[2020.0])|                   2592000| 3040387.918172978|
|    (5,[0],[2020.0])|            