# Icecream Sales Forecasting

1. This notebook demonstrates BigLake firegrained permissions
2. The dataset is the free [Kaggle Icecream Revenue](https://www.kaggle.com/vinicius150987/ice-cream-revenue) dataset 
3. A precreated BigLake table is backed by the Kaggle CSV file in Cloud Storage, and is secured with column-level security and row level security
4. The notebook first demonstrates PySpark, powered by Cloud Dataproc (personal auth cluster) to cleanse & transform data
5. It then forecasts revenue with Prophet (plain Python, not Spark)

### 1. Imports

In [None]:
import pandas as pd
from prophet import Prophet
from pyspark.sql.functions import month, date_format
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings('ignore')

### 2. Create a Spark session powered by Cloud Dataproc 

In [None]:
spark = SparkSession.builder.appName('Icecream Sales Analysis').getOrCreate()
spark

### 3. Read the BigLake table IceCreamSales via PySpark and the Spark BigQuery Connector

In [None]:
project_id_output = !gcloud config list --format "value(core.project)" 2>/dev/null
PROJECT_ID = project_id_output[0]
print("PROJECT_ID: ", PROJECT_ID)

In [None]:
project_name_output = !gcloud projects describe $PROJECT_ID | grep name | cut -d':' -f2 | xargs
PROJECT_NAME = project_name_output[0]
print("PROJECT_NAME: ", PROJECT_NAME)

In [None]:
rawDF = spark.read \
  .format("bigquery") \
  .load(f"{PROJECT_NAME}.biglake_dataset.IceCreamSales") 

### 4. Explore the raw IceCreamSales data

In [None]:
rawDF.show(10)

In [None]:
rawDF.createOrReplaceTempView("icecream_sales")

In [None]:
# Count total rows
spark.sql("select count(*) as row_count from icecream_sales").show()

In [None]:
# Date range of historical sales data
spark.sql("select min(month) date_range_start, max(month) date_range_end from icecream_sales").show()

### 5. Row level security powered by BigLake fine grained permissions
Depending on permissions, the persona executing the cell below may or may not see rows.<br>
e.g. usa_user@ can only see data for United States<br>
e.g. aus_user@ can only see data for Australia<br>
e.g. mkt_user@ should see rows for all countries<br>

In [None]:
# Countries listing
spark.sql("select distinct country from icecream_sales").show()

In [None]:
# Row count by country
spark.sql("select country, count(*) as row_count from icecream_sales group by country").show()

In [None]:
# Data for Australia
spark.sql("select * from icecream_sales where country='Australia' limit 10").show()

In [None]:
# Data for United States
spark.sql("select * from icecream_sales where country='United States' limit 10").show()

### 6. Column level security powered by BigLake fine grained permissions
Depending on permissions, the persona executing the cell below may or may not see rows (covered above) and columns.<br>
e.g. usa_user@ can only see data for (country=)United States, and can see columns Discount and Net_Revenue<br>
e.g. aus_user@ can only see data for (country=)Australia, and can see columns Discount and Net_Revenue<br>
e.g. mkt_user@ should see rows for all countries and cannot see columns Discount and Net_Revenue<br>

### 7. Subset and format the data, in preparation for forecasting

In [None]:
cleanedDF = rawDF.select(date_format('Month', 'MM/yyyy').alias('ds'),"gross_revenue").withColumn("revenue", rawDF["Gross_Revenue"].cast(IntegerType()))
cleanedDF.show(10)

### 8. Aggregate the data, in preparation for forecasting

In [None]:
groupedDF = cleanedDF.groupBy('ds').sum("Revenue").withColumnRenamed("sum(Revenue)", "y")
groupedDF.show(10)

### 9. Convert the aggregated "small" data to a Pandas dataframe for forecasting with Prophet

In [None]:
historyPDF = groupedDF.toPandas()

### 10. Instantiate a model

In [None]:
model = Prophet(
    interval_width=0.90
)

### 11. Fit the model with the Pandas dataframe of aggregated revenue by month and year

In [None]:
model.fit(historyPDF)

### 12. Create a Prophet future dataframe 

In [None]:
futureDatePDF = model.make_future_dataframe(
    periods=6,
    freq='m',
    include_history=True
)

### 13. Forecast future sales

In [None]:
forecastPDF = model.predict(futureDatePDF)

### 14. Review the predictions

In [None]:
forecastPDF[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

### 15. Plot the sales

In [None]:
predictionPlot = model.plot(forecastPDF, xlabel='date', ylabel='sales')