## Capstone Modeling

In [0]:
# restart python when needed
# dbutils.library.restartPython()

In [0]:
from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [0]:
import pyspark
from pyspark import SQLContext

# Setup the Configuration
conf = pyspark.SparkConf()
spark_context = SparkSession.builder.config(conf=conf).getOrCreate()
sqlcontext = SQLContext(sc)



In [0]:
%fs ls dbfs:/user/hive/warehouse/

In [0]:
# Read parquet file using read.parquet()
parquetDF = spark.read.parquet("/FileStore/tables/capstone/FACT_MARKET_DEMAND.parquet")

# Show the DataFrame
display(parquetDF.head(10))

DATE,MARKET_KEY,CALORIC_SEGMENT,CATEGORY,UNIT_SALES,DOLLAR_SALES,MANUFACTURER,BRAND,PACKAGE,ITEM
2021-08-21,524,DIET/LIGHT,SSD,69.0,389.74,SWIRE-CC,DIET YAWN,12SMALL 12ONE CUP,YAWN ZERO SUGAR GENTLE DRINK SUPER-JUICE DURIAN CUP 12 LIQUID SMALL X12
2022-05-07,637,REGULAR,SSD,4.0,30.96,COCOS,GORGEOUS ORANGEOUS,12SMALL 12ONE CUP,GORGEOUS SUNSET OUS GENTLE DRINK AVOCADO CUP 12 LIQUID SMALL X12
2022-10-22,628,DIET/LIGHT,ING ENHANCED WATER,1.0,2.25,JOLLYS,DIGRESS FLAVORED,20SMALL MULTI JUG,DIGRESS ZERO NUTRIENT ENHANCED WATER BVRG PURPLE ZERO CALORIE JUG 20 LIQUID SMALL
2022-08-13,216,REGULAR,SSD,3.0,7.55,COCOS,CHERRY FIZZ,1L MULTI JUG,KOOL! RED GENTLE DRINK RED COLA CONTOUR JUG 33.8 LIQUID SMALL
2022-01-01,210,REGULAR,SSD,4.0,25.96,COCOS,RADIANT'S,12SMALL 12ONE CUP,RADIANT'S GENTLE DRINK GINGER ALE CUP 12 LIQUID SMALL X12
2021-11-27,278,REGULAR,SSD,112.0,179.0,SWIRE-CC,ROOT BEER WONDER,2L MULTI JUG,JUMPIN JACKS GENTLE DRINK ROOT BEER JUG 67.6 LIQUID SMALL
2022-03-19,220,DIET/LIGHT,SPARKLING WATER,21.0,77.61,JOLLYS,BUBBLE JOY,12SMALL 8ONE CUP,BUBBLE JOY SPARKLING WATER RAZZ BUBBLE JOY NO CALORIES CUP 12 LIQUID SMALL X8
2021-11-27,499,DIET/LIGHT,SSD,3.0,10.0,JOLLYS,HILL MOISTURE ZERO SUGAR,.5L 6ONE JUG,RAINING ZERO SUGAR GENTLE DRINK AVOCADO ZERO CALORIE JUG 16.9 LIQUID SMALL X6
2021-07-17,754,DIET/LIGHT,SSD,19.0,96.67,JOLLYS,PAPI ZERO SUGAR CHERRY,12SMALL 12ONE CUP,WILD RED PAPI GENTLE DRINK WILD RED COLA CUP 12 LIQUID SMALL X12
2022-04-02,895,REGULAR,SSD,57.0,173.33,JOLLYS,HILL MOISTURE MAJOR MELON,.5L 6ONE JUG,RAINING GENTLE DRINK MAJOR CANES AVOCADO JUG 16.9 LIQUID SMALL X6


In [0]:
parquetDF.dtypes

Out[84]: [('DATE', 'string'),
 ('MARKET_KEY', 'bigint'),
 ('CALORIC_SEGMENT', 'string'),
 ('CATEGORY', 'string'),
 ('UNIT_SALES', 'double'),
 ('DOLLAR_SALES', 'double'),
 ('MANUFACTURER', 'string'),
 ('BRAND', 'string'),
 ('PACKAGE', 'string'),
 ('ITEM', 'string')]

In [0]:
# reading in the zipmap csv file
zipmap = spark.read.format("csv").option("header", "true").load("/FileStore/tables/capstone/zip_to_market_unit_mapping.csv")

# Look at the table
display(zipmap.head(10))

ZIP_CODE,MARKET_KEY
57714,161
57772,161
67701,1135
67740,1135
67748,1135
69334,352
69336,352
69341,352
69347,161
69358,352


In [0]:
from pyspark.sql.functions import to_date # rewrite this to use the already loaded package F

# convert MARKET_KEY to shorttype
parquetDF = parquetDF.withColumn("MARKET_KEY", F.col("MARKET_KEY").cast(ShortType()))

# Convert the 'DATE' column to datetime
parquetDF = parquetDF.withColumn('DATE', F.to_date(parquetDF['DATE'], 'yyyy-MM-dd'))

# Check the DataFrame schema again to confirm the change
parquetDF.printSchema()

root
 |-- DATE: date (nullable = true)
 |-- MARKET_KEY: short (nullable = true)
 |-- CALORIC_SEGMENT: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- UNIT_SALES: double (nullable = true)
 |-- DOLLAR_SALES: double (nullable = true)
 |-- MANUFACTURER: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- PACKAGE: string (nullable = true)
 |-- ITEM: string (nullable = true)



In [0]:
display(parquetDF.head(5))

DATE,MARKET_KEY,CALORIC_SEGMENT,CATEGORY,UNIT_SALES,DOLLAR_SALES,MANUFACTURER,BRAND,PACKAGE,ITEM
2021-08-21,524,DIET/LIGHT,SSD,69.0,389.74,SWIRE-CC,DIET YAWN,12SMALL 12ONE CUP,YAWN ZERO SUGAR GENTLE DRINK SUPER-JUICE DURIAN CUP 12 LIQUID SMALL X12
2022-05-07,637,REGULAR,SSD,4.0,30.96,COCOS,GORGEOUS ORANGEOUS,12SMALL 12ONE CUP,GORGEOUS SUNSET OUS GENTLE DRINK AVOCADO CUP 12 LIQUID SMALL X12
2022-10-22,628,DIET/LIGHT,ING ENHANCED WATER,1.0,2.25,JOLLYS,DIGRESS FLAVORED,20SMALL MULTI JUG,DIGRESS ZERO NUTRIENT ENHANCED WATER BVRG PURPLE ZERO CALORIE JUG 20 LIQUID SMALL
2022-08-13,216,REGULAR,SSD,3.0,7.55,COCOS,CHERRY FIZZ,1L MULTI JUG,KOOL! RED GENTLE DRINK RED COLA CONTOUR JUG 33.8 LIQUID SMALL
2022-01-01,210,REGULAR,SSD,4.0,25.96,COCOS,RADIANT'S,12SMALL 12ONE CUP,RADIANT'S GENTLE DRINK GINGER ALE CUP 12 LIQUID SMALL X12


In [0]:
zipmap.printSchema()

root
 |-- ZIP_CODE: string (nullable = true)
 |-- MARKET_KEY: string (nullable = true)



In [0]:
min_key = parquetDF.agg(F.min('MARKET_KEY')).collect()[0][0]
max_key = parquetDF.agg(F.max('MARKET_KEY')).collect()[0][0]

print("Minimum Market Key: ", min_key)
print("Maximum Market Key: ", max_key)

Minimum Market Key:  1
Maximum Market Key:  6802


In [0]:
# from pyspark.sql.functions import col # should be included from previous load

# Convert the 'MARKET_KEY' column in 'zipmap' to ShortType
zipmap = zipmap.withColumn("MARKET_KEY", F.col("MARKET_KEY").cast(ShortType()))

In [0]:
# Group by 'MARKET_KEY' and aggregate 'ZIP_CODE' into a list
zipmap_agg = zipmap.groupBy('MARKET_KEY').agg(F.collect_list('ZIP_CODE').alias('ZIP_CODES'))

# Join the dataframes
mergedDF = parquetDF.join(zipmap_agg, on='MARKET_KEY', how='left')

In [0]:
display(mergedDF.head(5))

MARKET_KEY,DATE,CALORIC_SEGMENT,CATEGORY,UNIT_SALES,DOLLAR_SALES,MANUFACTURER,BRAND,PACKAGE,ITEM,ZIP_CODES
524,2021-08-21,DIET/LIGHT,SSD,69.0,389.74,SWIRE-CC,DIET YAWN,12SMALL 12ONE CUP,YAWN ZERO SUGAR GENTLE DRINK SUPER-JUICE DURIAN CUP 12 LIQUID SMALL X12,"List(85087, 85266, 85324, 85331, 85377, 86333)"
637,2022-05-07,REGULAR,SSD,4.0,30.96,COCOS,GORGEOUS ORANGEOUS,12SMALL 12ONE CUP,GORGEOUS SUNSET OUS GENTLE DRINK AVOCADO CUP 12 LIQUID SMALL X12,"List(98133, 98155, 98028, 98103, 98115, 98117, 98125)"
628,2022-10-22,DIET/LIGHT,ING ENHANCED WATER,1.0,2.25,JOLLYS,DIGRESS FLAVORED,20SMALL MULTI JUG,DIGRESS ZERO NUTRIENT ENHANCED WATER BVRG PURPLE ZERO CALORIE JUG 20 LIQUID SMALL,List(95838)
216,2022-08-13,REGULAR,SSD,3.0,7.55,COCOS,CHERRY FIZZ,1L MULTI JUG,KOOL! RED GENTLE DRINK RED COLA CONTOUR JUG 33.8 LIQUID SMALL,"List(86047, 85924, 85928, 85933, 85936, 85937, 85939, 86025, 86028, 86029, 86031, 86032, 86034, 86042)"
210,2022-01-01,REGULAR,SSD,4.0,25.96,COCOS,RADIANT'S,12SMALL 12ONE CUP,RADIANT'S GENTLE DRINK GINGER ALE CUP 12 LIQUID SMALL X12,"List(85325, 85344, 85348, 85357, 85360, 86403, 86404, 86405, 86406, 86436, 86438)"


In [0]:
num_rows = mergedDF.count()
print(f"Number of rows: {num_rows}")

num_cols = len(mergedDF.columns)
print(f"Number of columns: {num_cols}")

Number of rows: 24461424
Number of columns: 11


In [0]:
num_rows = parquetDF.count()
print(f"Number of rows in original df: {num_rows}")

num_cols = len(parquetDF.columns)
print(f"Number of columns in original df: {num_cols}")

Number of rows in original df: 24461424
Number of columns in original df: 10


In [0]:
mergedDF.printSchema()

root
 |-- MARKET_KEY: short (nullable = true)
 |-- DATE: date (nullable = true)
 |-- CALORIC_SEGMENT: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- UNIT_SALES: double (nullable = true)
 |-- DOLLAR_SALES: double (nullable = true)
 |-- MANUFACTURER: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- PACKAGE: string (nullable = true)
 |-- ITEM: string (nullable = true)
 |-- ZIP_CODES: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [0]:
min_date = mergedDF.agg(F.min('DATE')).collect()[0][0]
max_date = mergedDF.agg(F.max('DATE')).collect()[0][0]

print("Minimum Date: ", min_date)
print("Maximum Date: ", max_date)

Minimum Date:  2020-12-05
Maximum Date:  2023-10-28


In [0]:
# Select numeric columns
numeric_cols = ['UNIT_SALES', 'DOLLAR_SALES']

# Get summary statistics for numeric columns
summary = mergedDF.select(numeric_cols).describe()

# Round the results to 2 decimal places
for col in numeric_cols:
    summary = summary.withColumn(col, F.round(summary[col], 2))

# Show the summary
display(summary)

summary,UNIT_SALES,DOLLAR_SALES
count,24461424.0,24461424.0
mean,174.37,591.14
stddev,857.81,3040.54
min,0.04,0.01
max,96776.0,492591.07


In [0]:
from pyspark.sql.functions import mean, stddev

mean_unit_sales = mergedDF.select(mean('UNIT_SALES')).collect()[0][0]
stddev_unit_sales = mergedDF.select(stddev('UNIT_SALES')).collect()[0][0]

mean_dollar_sales = mergedDF.select(mean('DOLLAR_SALES')).collect()[0][0]
stddev_dollar_sales = mergedDF.select(stddev('DOLLAR_SALES')).collect()[0][0]

from pyspark.sql.functions import col

mergedDF = mergedDF.withColumn('zscore_UNIT_SALES', (col('UNIT_SALES') - mean_unit_sales) / stddev_unit_sales)
mergedDF = mergedDF.withColumn('zscore_DOLLAR_SALES', (col('DOLLAR_SALES') - mean_dollar_sales) / stddev_dollar_sales)

In [0]:
display(mergedDF.head(5))

MARKET_KEY,DATE,CALORIC_SEGMENT,CATEGORY,UNIT_SALES,DOLLAR_SALES,MANUFACTURER,BRAND,PACKAGE,ITEM,ZIP_CODES,zscore_UNIT_SALES,zscore_DOLLAR_SALES
524,2021-08-21,DIET/LIGHT,SSD,69.0,389.74,SWIRE-CC,DIET YAWN,12SMALL 12ONE CUP,YAWN ZERO SUGAR GENTLE DRINK SUPER-JUICE DURIAN CUP 12 LIQUID SMALL X12,"List(85087, 85266, 85324, 85331, 85377, 86333)",-0.1228371522573727,-0.0662382727011347
637,2022-05-07,REGULAR,SSD,4.0,30.96,COCOS,GORGEOUS ORANGEOUS,12SMALL 12ONE CUP,GORGEOUS SUNSET OUS GENTLE DRINK AVOCADO CUP 12 LIQUID SMALL X12,"List(98133, 98155, 98028, 98103, 98115, 98117, 98125)",-0.198611473644821,-0.1842370597245729
628,2022-10-22,DIET/LIGHT,ING ENHANCED WATER,1.0,2.25,JOLLYS,DIGRESS FLAVORED,20SMALL MULTI JUG,DIGRESS ZERO NUTRIENT ENHANCED WATER BVRG PURPLE ZERO CALORIE JUG 20 LIQUID SMALL,List(95838),-0.2021087500165494,-0.1936794622426701
216,2022-08-13,REGULAR,SSD,3.0,7.55,COCOS,CHERRY FIZZ,1L MULTI JUG,KOOL! RED GENTLE DRINK RED COLA CONTOUR JUG 33.8 LIQUID SMALL,"List(86047, 85924, 85928, 85933, 85936, 85937, 85939, 86025, 86028, 86029, 86031, 86032, 86034, 86042)",-0.1997772324353971,-0.1919363506667065
210,2022-01-01,REGULAR,SSD,4.0,25.96,COCOS,RADIANT'S,12SMALL 12ONE CUP,RADIANT'S GENTLE DRINK GINGER ALE CUP 12 LIQUID SMALL X12,"List(85325, 85344, 85348, 85357, 85360, 86403, 86404, 86405, 86406, 86436, 86438)",-0.198611473644821,-0.1858815046075574


In [0]:
import pandas as pd

# Calculate null counts
null_counts = mergedDF.agg(*[F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in parquetDF.columns])

# Convert to Pandas DataFrame
null_counts_pd = null_counts.toPandas()

# Transpose the DataFrame
null_counts_pd = null_counts_pd.transpose()

# Reset the index
null_counts_pd.reset_index(inplace=True)

# Rename the columns
null_counts_pd.columns = ['Column', 'Null Count']

# Display the DataFrame
display(null_counts_pd)

Column,Null Count
DATE,0
MARKET_KEY,0
CALORIC_SEGMENT,59725
CATEGORY,0
UNIT_SALES,0
DOLLAR_SALES,0
MANUFACTURER,0
BRAND,0
PACKAGE,0
ITEM,0


In [0]:
mergedDF.printSchema()

root
 |-- MARKET_KEY: short (nullable = true)
 |-- DATE: date (nullable = true)
 |-- CALORIC_SEGMENT: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- UNIT_SALES: double (nullable = true)
 |-- DOLLAR_SALES: double (nullable = true)
 |-- MANUFACTURER: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- PACKAGE: string (nullable = true)
 |-- ITEM: string (nullable = true)
 |-- ZIP_CODES: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- zscore_UNIT_SALES: double (nullable = true)
 |-- zscore_DOLLAR_SALES: double (nullable = true)



In [0]:
from pyspark.sql.types import IntegerType, FloatType, ShortType
from pyspark.ml.feature import StringIndexer

# Downcast numeric columns
mergedDF = mergedDF.withColumn('MARKET_KEY', mergedDF['MARKET_KEY'].cast(IntegerType()))
mergedDF = mergedDF.withColumn('UNIT_SALES', F.round(mergedDF['UNIT_SALES'].cast(FloatType()), 2))
mergedDF = mergedDF.withColumn('DOLLAR_SALES', F.round(mergedDF['DOLLAR_SALES'].cast(FloatType()), 2))
mergedDF = mergedDF.withColumn('zscore_UNIT_SALES', F.round(mergedDF['zscore_UNIT_SALES'].cast(FloatType()), 2))
mergedDF = mergedDF.withColumn('zscore_DOLLAR_SALES', F.round(mergedDF['zscore_DOLLAR_SALES'].cast(FloatType()), 2))

# Extract year, month, and day from DATE and cast to ShortType
mergedDF = mergedDF.withColumn('Year', F.year('DATE').cast(ShortType()))
mergedDF = mergedDF.withColumn('Month', F.month('DATE').cast(ShortType()))
mergedDF = mergedDF.withColumn('Day', F.dayofmonth('DATE').cast(ShortType()))

# # Convert object columns to category, drop original columns, and rename new ones
# indexer_cols = ['CALORIC_SEGMENT', 'CATEGORY', 'MANUFACTURER', 'BRAND', 'PACKAGE', 'ITEM']
# for col in indexer_cols:
#     indexer = StringIndexer(inputCol=col, outputCol=col+"_index")
#     mergedDF = indexer.fit(mergedDF).transform(mergedDF)
#     mergedDF = mergedDF.drop(col)
#     mergedDF = mergedDF.withColumnRenamed(col+"_index", col)

In [0]:
mergedDF.printSchema()

root
 |-- MARKET_KEY: integer (nullable = true)
 |-- DATE: date (nullable = true)
 |-- CALORIC_SEGMENT: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- UNIT_SALES: float (nullable = true)
 |-- DOLLAR_SALES: float (nullable = true)
 |-- MANUFACTURER: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- PACKAGE: string (nullable = true)
 |-- ITEM: string (nullable = true)
 |-- ZIP_CODES: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- zscore_UNIT_SALES: float (nullable = true)
 |-- zscore_DOLLAR_SALES: float (nullable = true)
 |-- Year: short (nullable = true)
 |-- Month: short (nullable = true)
 |-- Day: short (nullable = true)



## Modeling

### XGBoost

In [0]:
# dbutils.library.installPyPI("xgboost")

from xgboost import XGBRegressor
from pyspark.ml.feature import VectorAssembler

In [0]:
mergedDF = mergedDF.drop('DATE', 'ZIP_CODES')

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import XGBoostClassifier

# Define the input columns for the VectorAssembler
input_cols = [col for col in mergedDF.columns if col != 'target']

# Initialize the VectorAssembler
assembler = VectorAssembler(inputCols=input_cols, outputCol='features')

# Transform the DataFrame
mergedDF = assembler.transform(mergedDF)

[0;31m---------------------------------------------------------------------------[0m
[0;31mImportError[0m                               Traceback (most recent call last)
File [0;32m<command-3425296378987475>:2[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfeature[39;00m [38;5;28;01mimport[39;00m VectorAssembler
[0;32m----> 2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mclassification[39;00m [38;5;28;01mimport[39;00m XGBoostClassifier
[1;32m      4[0m [38;5;66;03m# Define the input columns for the VectorAssembler[39;00m
[1;32m      5[0m input_cols [38;5;241m=[39m [col [38;5;28;01mfor[39;00m col [38;5;129;01min[39;00m mergedDF[38;5;241m.[39mcolumns [38;5;28;01mif[39;00m col [38;5;241m!=[39m [38;5;124m'[39m[38;5;124mtarget[39m[38;5;124m'[39m]

[0;31mIm

In [0]:
# Split the data into training and testing sets
train_data, test_data = df_transformed.randomSplit([0.7, 0.3])

# Check the number of rows in each DataFrame
print(f"Number of training records: {train_data.count()}")
print(f"Number of testing records : {test_data.count()}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3425296378987474>:5[0m
[1;32m      2[0m train_data, test_data [38;5;241m=[39m df_transformed[38;5;241m.[39mrandomSplit([[38;5;241m0.7[39m, [38;5;241m0.3[39m])
[1;32m      4[0m [38;5;66;03m# Check the number of rows in each DataFrame[39;00m
[0;32m----> 5[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mNumber of training records: [39m[38;5;132;01m{[39;00mtrain_data[38;5;241m.[39mcount()[38;5;132;01m}[39;00m[38;5;124m"[39m)
[1;32m      6[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mNumber of testing records : [39m[38;5;132;01m{[39;00mtest_data[38;5;241m.[39mcount()[38;5;132;01m}[39;00m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrap

In [0]:
params = {
    "max_depth": 7,
    "learning_rate": 0.1,
    "objective": "reg:squarederror",
    "num_estimators": 1000,
    "num_boost_round": 1000
}

### ARIMA

In [0]:
import matplotlib.pyplot as plt
from statsmodels.tsa.arima_model import ARIMA

In [0]:
from pyspark.sql.window import Window

# Define the window based on the date column
window = Window.orderBy(mergedDF['DATE']).rowsBetween(-11, 0)

# Calculate the rolling mean and standard deviation
rolmean = mergedDF.withColumn('rolling_mean', F.mean('UNIT_SALES').over(window))
rolstd = mergedDF.withColumn('rolling_std', F.stddev('UNIT_SALES').over(window))

In [0]:
# Calculate the rolling mean and standard deviation
df = mergedDF.withColumn('rolling_mean', F.mean('UNIT_SALES').over(window))
df = df.withColumn('rolling_std', F.stddev('UNIT_SALES').over(window))

# Display the DataFrame
display(df)

In [0]:
# Split data into two parts
X = mergedDF.select('UNIT_SALES').toPandas().values
split = len(X) // 2
X1, X2 = X[0:split], X[split:]

# Compare the mean and variance of each group
mean1, mean2 = X1.mean(), X2.mean()
var1, var2 = X1.var(), X2.var()

print('mean1=%f, mean2=%f' % (mean1, mean2))
print('variance1=%f, variance2=%f' % (var1, var2))

In [0]:
# Determine the number of partitions
num_partitions = 5  # Adjust this value based on your data size and available resources

# Repartition the DataFrame
repartitionedDF = mergedDF.repartition(num_partitions)

In [0]:
def process_partition(iterator):
    for row in iterator:
        # Process each row in the partition here
        pass

repartitionedDF.rdd.foreachPartition(process_partition)

In [0]:
from statsmodels.tsa.stattools import adfuller
import random

# Generate a random index
random_index = random.randint(0, num_partitions - 1)

# Grab a random partition
random_partition = (repartitionedDF
                    .rdd
                    .mapPartitionsWithIndex(lambda index, iterator: iterator if index == random_index else iter([]))
                    .sample(False, 0.1)  # Take a 10% sample of the data
                    .collect())

# Convert the random partition to a Pandas DataFrame
random_partition_df = pd.DataFrame(random_partition, columns=repartitionedDF.columns)

# Perform Dickey-Fuller test:
print('Results of Dickey-Fuller Test:')
dftest = adfuller(random_partition_df['UNIT_SALES'], autolag='AIC')

dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
for key,value in dftest[4].items():
    dfoutput['Critical Value (%s)'%key] = value

In [0]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Plot ACF
plot_acf(random_partition_df['UNIT_SALES'], lags=50)
plt.show()

# Plot PACF
plot_pacf(random_partition_df['UNIT_SALES'], lags=50)
plt.show()

In [0]:
from statsmodels.tsa.arima.model import ARIMA

# Fit an ARIMA model
model = ARIMA(random_partition_df['UNIT_SALES'], order=(0, 0, 0))
model_fit = model.fit()

# Summary of the model
print(model_fit.summary())