## Introduction  
Now that we've run the model locally with one month of data, we'd like to build the model using multiple months. The total data *zipped* is about ~10GB, but unzipped it will be much more. We can serialize the data to a Pandas dataframe but most likely it will throw memory issues depending on the machine you have. We want to write code for one month, locally, using PySpark then migrate the code to run on EMR, and take multiple unzipped files.

## Objectives  
* Migrate the model using PySpark to fully utilize distributed computing resource

First, use the boto3 client to set up the s3 resource then check if the file exists in your bucket. If it doesn't exist, you might have to upload it. You can skip this step for now, but will be helpful for the next lab, where you'll be pulling the data from the S3 bucket.

In [2]:
!pip install pyspark



In [3]:
import boto3
from pyspark.sql import SparkSession
#solution


In [4]:
spark = SparkSession \
    .builder \
    .appName("XGBoost") \
    .getOrCreate()

In [5]:
# path could be local or boto3
path = "C:/Users/takinlabi/Downloads/archive/2019-Nov.csv"
df = spark.read.csv(path=path, header="true", inferSchema="true")

In [6]:
display(df)

DataFrame[event_time: string, event_type: string, product_id: int, category_id: bigint, category_code: string, brand: string, price: double, user_id: int, user_session: string]

In [7]:
#df.cache()

### How many unique customers?

In [8]:
# solution
visitor = df["user_id"].nunique()
print("visitors: {}".format(visitor))

TypeError: 'Column' object is not callable

### Preprocess the data

Using the logic from the previous lab, use pyspark functions to explore the dataset.

# Modeling: Cart Abandonment

The model will be similar - let's build out the new features then start building the model.

In [None]:
# solution for additional columns/features

In [28]:
# df should be the dataframe with additional columns/features
train, test = df.randomSplit([0.7, 0.3], seed = 42)
print("There are %d training examples and %d test examples." % (train.count(), test.count()))

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60287)
Traceback (most recent call last):
  File "c:\Users\takinlabi\anaconda3\envs\spark-env\lib\site-packages\py4j\java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\takinlabi\anaconda3\envs\spark-env\lib\site-packages\py4j\java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:60287)

Most MLlib algorithms require a single input column containing a vector of features and a single target column. The DataFrame currently has one column for each feature. MLlib provides functions to help you prepare the dataset in the required format.

MLlib pipelines combine multiple steps into a single workflow, making it easier to iterate as you develop the model.

In this example, you create a pipeline using the following functions:

- VectorAssembler: Assembles the feature columns into a feature vector.
- VectorIndexer: Identifies columns that should be treated as categorical. This is done heuristically, identifying any column with a small number of distinct values as categorical. In this example, the cart abandonment feature would be categorical (0 or 1)
- XgboostRegressor: Uses the XgboostRegressor estimator to learn how to predict rental counts from the feature vectors.
- CrossValidator: The XGBoost regression algorithm has several hyperparameters. This notebook illustrates how to use hyperparameter tuning in Spark. This capability automatically tests a grid of hyperparameters and chooses the best resulting model.

In [29]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
 
# Remove the target column from the input feature set.
featuresCols = df.columns
# featuresCols.remove('your target column')
 
# vectorAssembler combines all feature columns into a single feature vector column, "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
 
# vectorIndexer identifies categorical features and indexes them, and creates a new column "features". 
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60287)
Traceback (most recent call last):
  File "c:\Users\takinlabi\anaconda3\envs\spark-env\lib\site-packages\py4j\java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\takinlabi\anaconda3\envs\spark-env\lib\site-packages\py4j\java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:60287)

In [26]:
!pip install tensorflow

Collecting tensorflow
  Obtaining dependency information for tensorflow from https://files.pythonhosted.org/packages/fb/59/3eb58629e3749d9f4fc1e522487af369f9bd4c451f465d3054961fab6bf8/tensorflow-2.13.1-cp38-cp38-win_amd64.whl.metadata
  Downloading tensorflow-2.13.1-cp38-cp38-win_amd64.whl.metadata (2.6 kB)
INFO: pip is looking at multiple versions of tensorflow to determine which version is compatible with other requirements. This could take a while.
  Obtaining dependency information for tensorflow from https://files.pythonhosted.org/packages/5b/6f/8b125d126d54061c0be610b135aaf2f8960f55c2e185ac32445e4a5012d5/tensorflow-2.13.0-cp38-cp38-win_amd64.whl.metadata
  Downloading tensorflow-2.13.0-cp38-cp38-win_amd64.whl.metadata (2.6 kB)
Collecting tensorflow-intel==2.13.0 (from tensorflow)
  Obtaining dependency information for tensorflow-intel==2.13.0 from https://files.pythonhosted.org/packages/38/ba/dd4d998a852451e98dc009ecb208bbb0eeb0c8252dc35b7c4e1050762b36/tensorflow_intel-2.13.0-cp3

In [30]:
from sparkdl.xgboost import XgboostRegressor
 
xgb_regressor = XgboostRegressor(num_workers=3, labelCol="your_label_column", missing=0.0)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60287)
Traceback (most recent call last):
  File "c:\Users\takinlabi\anaconda3\envs\spark-env\lib\site-packages\py4j\java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\takinlabi\anaconda3\envs\spark-env\lib\site-packages\py4j\java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60287)
Traceback (most recent call last):
  File "c:\Users\takinlabi\anaconda3\envs\spark-env\lib\site-packages\py4j\java_gateway.py", line 977, in _get_connection
    connection = self.deq

ModuleNotFoundError: No module named 'tensorframes'

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
 
# Define a grid of hyperparameters to test:
#  - maxDepth: maximum depth of each decision tree 
#  - maxIter: iterations, or the total number of trees 
paramGrid = ParamGridBuilder()\
  .addGrid(xgb_regressor.max_depth, [2, 5])\
  .addGrid(xgb_regressor.n_estimators, [10, 100])\
  .build()
 
# Define an evaluation metric.  The CrossValidator compares the true labels with predicted values for each combination of parameters, and calculates this value to determine the best model.
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol=xgb_regressor.getLabelCol(),
                                predictionCol=xgb_regressor.getPredictionCol())
 
# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=xgb_regressor, evaluator=evaluator, estimatorParamMaps=paramGrid)

NameError: name 'xgb_regressor' is not defined

### Create the pipeline

In [20]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

NameError: name 'vectorAssembler' is not defined