Business Goal 5: We will identify how Glossier demand is affected by COVID-19 rates and forecast the relationship for the next year.

Technical Proposal: To accomplish this, we will join external COVID-19 rate data to the Glossier subreddit activity data by day. Like above, we will identify the number of posts and comments that Glossier is mentioned in by day. We will also aggregate the total COVID-19 cases by day. To measure the effect of the COVID-19 rates on the demand, we will develop a multivariate time series ML model to forecast disease rates in conjunction with the demand. As mentioned above, this information will also be depicted on line charts to easily see the relationships and patterns between the two variables and the forecasts over time.

In [0]:
# loading data
df_covid = spark.read.format("csv").option("header","true").load("dbfs:/FileStore/WHO_covid_data/WHO_COVID_19_global_data.csv")

In [0]:
df_covid.show()

+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
|Date_reported|Country_code|    Country|WHO_region|New_cases|Cumulative_cases|New_deaths|Cumulative_deaths|
+-------------+------------+-----------+----------+---------+----------------+----------+-----------------+
|   2020-01-03|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|   2020-01-04|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|   2020-01-05|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|   2020-01-06|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|   2020-01-07|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|   2020-01-08|          AF|Afghanistan|      EMRO|        0|               0|         0|                0|
|   2020-01-09|          AF|

In [0]:
## Read in glossier data 
glos_comments = spark.read.parquet("/FileStore/glossier/glossier_comments")
glos_submissions = spark.read.parquet("/FileStore/glossier/glossier_submissions")

In [0]:
# first let's filter out NSFW submissions
glos_submissions = glos_submissions.filter(glos_submissions.over_18 == False)

# now let's filter out moderators/admins
glos_comments = glos_comments.filter(glos_comments.distinguished.isNull())
glos_submissions = glos_submissions.filter(glos_submissions.distinguished.isNull())

In [0]:
# keeping only necessary columns
tokeep_comments = ["author", "created_utc", "subreddit", "score", "body"]
tokeep_submissions = ["author", "created_utc", "subreddit", "score", "title", "selftext"]

glos_comments = glos_comments.select(*tokeep_comments)
glos_submissions = glos_submissions.select(*tokeep_submissions)

In [0]:
# combining title and self text to analyze whole submission
from pyspark.sql.functions import concat_ws
glos_submissions = glos_submissions.select("author", "created_utc", "subreddit", "score", concat_ws(" ", \
                                                       glos_submissions.title,glos_submissions.selftext).alias("body"))
glos_submissions.show(5)

+-----------------+-----------+------------------+-----+--------------------+
|           author|created_utc|         subreddit|score|                body|
+-----------------+-----------+------------------+-----+--------------------+
|        Mimizzzzz| 1630435547|          glossier|  102|Spotted 👀 glossi...|
|chicagoturkergirl| 1645982905|MoneyDiariesACTIVE|   98|I am 44 years old...|
|    creditphoenix| 1627477898|          glossier|   12|Closest thing to ...|
|     aquaholic789| 1643163837|   SkincareAddicts|    2|How to work your ...|
|        [deleted]| 1630689612|          glossier|    1|Seeking lip shade...|
+-----------------+-----------+------------------+-----+--------------------+
only showing top 5 rows



In [0]:
# combining datasets
df_concat = glos_submissions.union(glos_comments)
df_concat.show(5)

+-----------------+-----------+------------------+-----+--------------------+
|           author|created_utc|         subreddit|score|                body|
+-----------------+-----------+------------------+-----+--------------------+
|        Mimizzzzz| 1630435547|          glossier|  102|Spotted 👀 glossi...|
|chicagoturkergirl| 1645982905|MoneyDiariesACTIVE|   98|I am 44 years old...|
|    creditphoenix| 1627477898|          glossier|   12|Closest thing to ...|
|     aquaholic789| 1643163837|   SkincareAddicts|    2|How to work your ...|
|        [deleted]| 1630689612|          glossier|    1|Seeking lip shade...|
+-----------------+-----------+------------------+-----+--------------------+
only showing top 5 rows



In [0]:
# removing any null values in the body
df_concat = df_concat.filter(df_concat.body.isNotNull())
df_concat = df_concat.filter(df_concat.body != "[removed]")

In [0]:
# let's only look at the glossier subreddit for this viz
df_concat = df_concat.filter(df_concat.subreddit == "glossier")

In [0]:
(df_concat.count(), len(df_concat.columns))

Out[10]: (104304, 5)

In [0]:
## Convert the columns to appropriate data type 
df_concat = df_concat.withColumn("created_utc",df_concat.created_utc.cast('timestamp'))

In [0]:
## Get the count of records for each day 
from pyspark.sql.functions import col, asc,desc
from pyspark.sql.functions import *

df_agg_glossier = df_concat.groupBy(to_date("created_utc").alias("dt")).agg(count("body").alias("activity_count")).sort("dt").toPandas()
df_agg_glossier.head()

Unnamed: 0,dt,activity_count
0,2021-01-01,179
1,2021-01-02,204
2,2021-01-03,187
3,2021-01-04,267
4,2021-01-05,301


In [0]:
df_covid.printSchema()

root
 |-- Date_reported: string (nullable = true)
 |-- Country_code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- WHO_region: string (nullable = true)
 |-- New_cases: string (nullable = true)
 |-- Cumulative_cases: string (nullable = true)
 |-- New_deaths: string (nullable = true)
 |-- Cumulative_deaths: string (nullable = true)



In [0]:
# now let's agg the WHO data by day
df_agg_covid = df_covid.groupBy(to_date("date_reported").alias("dt")).agg(sum("New_cases").cast("int").alias("daily_new_cases"), 
                                                                          sum("Cumulative_cases").cast("int").alias("cumulative_cases"), 
                                                                          sum("New_deaths").cast("int").alias("daily_new_deaths"),
                                                                          sum("Cumulative_deaths").cast("int").alias("cumulative_deaths")).sort("dt").toPandas()
df_agg_covid.head()

Unnamed: 0,dt,daily_new_cases,cumulative_cases,daily_new_deaths,cumulative_deaths
0,2020-01-03,0,0,0,0
1,2020-01-04,3,3,0,0
2,2020-01-05,0,3,0,0
3,2020-01-06,3,6,0,0
4,2020-01-07,0,6,0,0


In [0]:
df_agg_covid.dtypes

Out[15]: dt                   object
daily_new_cases       int32
cumulative_cases      int32
daily_new_deaths      int32
cumulative_deaths     int32
dtype: object

In [0]:
df_agg_glossier.dtypes

Out[16]: dt                object
activity_count     int64
dtype: object

In [0]:
# converting to datetime
import pandas as pd

df_agg_covid['dt'] =  pd.to_datetime(df_agg_covid['dt'], format='%Y-%m-%d')
df_agg_glossier['dt'] =  pd.to_datetime(df_agg_glossier['dt'], format='%Y-%m-%d')

df_agg_covid.dtypes



Out[17]: dt                   datetime64[ns]
daily_new_cases               int32
cumulative_cases              int32
daily_new_deaths              int32
cumulative_deaths             int32
dtype: object

In [0]:
# now using an inner join to join the two datasets
final_df = pd.merge(df_agg_glossier, df_agg_covid, how ='inner', on ='dt')
final_df.head(20)

Unnamed: 0,dt,activity_count,daily_new_cases,cumulative_cases,daily_new_deaths,cumulative_deaths
0,2021-01-01,179,1797638,83677827,26840,1936946
1,2021-01-02,204,412196,84090023,8543,1945489
2,2021-01-03,187,328962,84418985,7884,1953373
3,2021-01-04,267,352863,84771848,7819,1961192
4,2021-01-05,301,384608,85156456,8838,1970030
5,2021-01-06,252,489253,85645709,11052,1981082
6,2021-01-07,177,555086,86200795,10729,1991811
7,2021-01-08,221,2117364,88318159,29701,2021512
8,2021-01-09,213,532748,88850907,11484,2032996
9,2021-01-10,217,489057,89339964,9465,2042461


In [0]:
import os
fpath = os.path.join("/Workspace/Repos/cag199@georgetown.edu/fall-2022-reddit-big-data-project-project-group-16/data/csv/", "glossier_covid.csv")
final_df.to_csv(fpath)

In [0]:
# visualizing
import plotly.express as px

fig = px.line(final_df, x="dt", y="daily_new_cases")
fig.update_layout(
    title={'text':"COVID New Cases From Jan 21 - Aug 22", 'xanchor': 'center', 'yanchor': 'top','y':0.9,'x':0.5},
    xaxis_title="Day",
    yaxis_title="COVID New Cases",
    template="plotly_white",
    font=dict(
        size=12,
        color="Black"))
fig.show()

In [0]:
# visualizing
import plotly.express as px

fig = px.line(final_df, x="dt", y="activity_count")
fig.update_layout(
    title={'text':"Glossier Activity From Jan 21 - Aug 22", 'xanchor': 'center', 'yanchor': 'top','y':0.9,'x':0.5},
    xaxis_title="Day",
    yaxis_title="Subreddit Activity (Comments + Posts)",
    template="plotly_white",
    font=dict(
        size=12,
        color="Black"))
fig.show()

In [0]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
 
fig = make_subplots(specs=[[{"secondary_y": True}]])
 
fig.add_trace(
    go.Scatter(x=final_df["dt"], y=final_df["daily_new_cases"], name="COVID Daily New Cases", mode='lines'),
    secondary_y=False)
 
# Use add_trace function and specify secondary_y axes = True.
fig.add_trace(
    go.Scatter(x=final_df["dt"], y=final_df["activity_count"], name="Glossier Activity", mode='lines'),
    secondary_y=True)
 
# Adding title text to the figure
fig.update_layout(
    title_text="Glossier Activity And COVID Cases From Jan 21 - Aug 22", plot_bgcolor = "white"
)

# Naming x-axis
fig.update_xaxes(title_text="Day")
 
# Naming y-axes
fig.update_yaxes(title_text="COVID New Cases", secondary_y=False)
fig.update_yaxes(title_text="Glossier Activity (Comments + Posts)", secondary_y=True)
fpath = os.path.join("/Workspace/Repos/cag199@georgetown.edu/fall-2022-reddit-big-data-project-project-group-16/data/plots/", "covid_viz.html")
fig.write_html(fpath)
fig.show()