In [4]:
from dotenv import dotenv_values
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from google.cloud import storage

HOME = "/home/jovyan"
config = dotenv_values(HOME + "/.env")
CREDENTIALS = HOME + "/gcp_credentials.json"
BUCKET_NAME = config["project-name"] + "-" + config["bucket-name"]
GCS_CLIENT = storage.Client.from_service_account_json(CREDENTIALS)
INDUSTRIES_PATH = f"gs://{BUCKET_NAME}/raw/industries.parquet"
COMPANIES_PATH = f"gs://{BUCKET_NAME}/raw/companies.parquet"
SHAREPRICES_PATH = f"gs://{BUCKET_NAME}/raw/shareprices.parquet"

def get_file_names(folder, include_subfolders=False):
    bucket_handle = GCS_CLIENT.bucket(BUCKET_NAME)
    names = [
        b.name[(len(folder) + 1):]
        for b in bucket_handle.list_blobs(prefix=folder)
    ]
    if not include_subfolders:
        output = []
        for name in names:
            if len(name) > 1:
                if name[-1] == "/":
                    name = name[0:-1]
                if "/" not in name:
                    output.append(name)
        names = output
    return names

def get_processed_year_months():
    names = get_file_names("processed")
    return sorted([n[0:6] for n in names])

processed_year_months = get_processed_year_months()
last_processed_year_month = processed_year_months[-1]

In [5]:
processed_year_months

['201702',
 '201703',
 '201704',
 '201705',
 '201706',
 '201707',
 '201708',
 '201709',
 '201710',
 '201711',
 '201712',
 '201801',
 '201802',
 '201803',
 '201804',
 '201805',
 '201806',
 '201807',
 '201808',
 '201809',
 '201810',
 '201811',
 '201812',
 '201901',
 '201902',
 '201903',
 '201904',
 '201905',
 '201906',
 '201907',
 '201908',
 '201909',
 '201910',
 '201911',
 '201912',
 '202001',
 '202002',
 '202003',
 '202004',
 '202005',
 '202006',
 '202007',
 '202008',
 '202009',
 '202010',
 '202011',
 '202012',
 '202101',
 '202102',
 '202103',
 '202104',
 '202105',
 '202106',
 '202107',
 '202108',
 '202109',
 '202110',
 '202111',
 '202112',
 '202201',
 '202202']

In [23]:
spark = (
    SparkSession
    .builder
    .master("local")
    .appName("test")
    .getOrCreate()
)
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", CREDENTIALS)

In [24]:
industries = (
    spark
    .read
    .option("header", "true")
    .parquet(INDUSTRIES_PATH)
)
companies = (
    spark
    .read
    .option("header", "true")
    .parquet(COMPANIES_PATH)
)
shareprices = (
    spark
    .read
    .option("header", "true")
    .parquet(SHAREPRICES_PATH)
)

In [25]:
df_1 = (
    shareprices
    .join(companies, shareprices.SimFinId == companies.SimFinId)
)
df_2 = (
    df_1
    .join(industries, df_1.IndustryId == industries.IndustryId)
    .select("Date", "Low", "High", "Company Name", "Sector", "Open", "Close")
)

df_2 = (
    df_2
    .withColumn("Turmoil", 100 * f.abs(df_2["High"] - df_2["Low"]) / df_2["Low"])
    .withColumn("Net Change", 100 * (df_2["Close"] - df_2["Open"]) / df_2["Open"])
    .withColumn("Avg Price", (df_2["Close"] + df_2["Open"]) / 2)
)
value_cols = ["Turmoil", "Net Change", "Avg Price"]

In [26]:
day_totals = df_2.groupBy("Date").agg(*[f.avg(c).alias(c) for c in value_cols])
day_totals = day_totals.withColumn("Company Name", f.lit("Total"))
day_totals = day_totals.withColumn("Sector", f.lit("Total"))
sector_totals = df_2.groupBy("Date", "Sector").agg(*[f.avg(c).alias(c) for c in value_cols])
sector_totals = sector_totals.withColumn("Company Name", f.concat(f.col("Sector"), f.lit(" Total")))

In [27]:
df = (
    df_2
    .unionByName(day_totals, allowMissingColumns=True)
    .unionByName(sector_totals, allowMissingColumns=True)
)
df = (
    df
    .withColumn("Year", f.year("Date"))
    .withColumn("Month", f.format_string('%02d', f.month("Date")))
)
df = df.withColumn("Year-Month", f.concat("Year", "Month"))
df = df.drop("Year").drop("Month")

year_months = sorted([
    v["Year-Month"]
    for v in df.select("Year-Month").distinct().collect()
])

In [28]:
for year_month in year_months: 
    if (year_month not in processed_year_months) or (year_month == last_processed_year_month):
        print(f"Saving {year_month}...")
        month = df.select("*").where(df["Year-Month"] == year_month)
        month.write.parquet(
            f"gs://{BUCKET_NAME}/processed/{year_month}_shares.parquet",
            mode="overwrite"
        )

Saving 201703...
Saving 201704...
Saving 201705...
Saving 201706...
Saving 201707...
Saving 201708...
Saving 201709...
Saving 201710...
Saving 201711...
Saving 201712...
Saving 201801...
Saving 201802...
Saving 201803...
Saving 201804...
Saving 201805...
Saving 201806...
Saving 201807...
Saving 201808...
Saving 201810...
Saving 201811...
Saving 201812...
Saving 201901...
Saving 201902...
Saving 201904...
Saving 201905...
Saving 201906...
Saving 201907...
Saving 201908...
Saving 201909...
Saving 201910...
Saving 201911...
Saving 201912...
Saving 202001...
Saving 202002...
Saving 202003...
Saving 202004...
Saving 202005...
Saving 202008...
Saving 202010...
Saving 202012...
Saving 202102...
Saving 202103...
Saving 202104...
Saving 202105...
Saving 202106...
Saving 202107...
Saving 202108...
Saving 202109...
Saving 202111...
Saving 202112...
Saving 202201...
Saving 202202...
