In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [1]:
import os
from prefect import task
from pyspark.sql import SparkSession, Window
from pyspark.sql import DataFrame as SparkDataFrame
import contextlib
import os
from prefect import flow
from pyspark import SparkConf,SparkContext
from prefect.blocks.system import Secret
import pandas as pd
from typing import List


In [None]:
PATH = '/Users/og/Desktop/'

In [3]:
@contextlib.contextmanager
def get_spark_session(json, app_name='economic_data'):
    """
    Function that is wrapped by context manager
    Args:
      - conf(SparkConf): It is the configuration for the Spark session
    """
    json_path = json
    
    conf = SparkConf() \
        .setMaster('local[*]') \
        .setAppName(app_name) \
        .set("spark.jars", "/Users/og/Downloads/gcs-connector-hadoop2-2.2.15-shaded.jar") \
        .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
        .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", json_path)

    spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

    hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
    hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", json_path)
    hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

    try:
        yield spark
    finally:
        spark.stop()


In [4]:
def retrieve_gcp_parquet(years: List[int], bucket: str, base_path: str, spark: SparkSession):
    dfs = []
    for year in years:
        file_path = f'{base_path}{year}_California_city_economic_data.parquet'
        df = spark.read.parquet(f'gs://{bucket}/{file_path}')
        dfs.append(df)

    return dfs

In [5]:


def union_dataframes(dfs: List[SparkDataFrame]) -> SparkDataFrame:
    meta_df = None
    for df in dfs:
        if meta_df is None:
            meta_df = df 
        else:
            meta_df = meta_df.union(df)
    return meta_df



In [9]:
def data_pipeline(json_path):
    bucket = 'de_final_project_bucket'
    base_path = 'data/economic/city/'
    years = range(2020,2021)
    # Setting up the Spark cluster
    with get_spark_session(json=json_path,app_name='test') as spark_session:

        dfs = retrieve_gcp_parquet(years=years,
                                bucket=bucket,
                                base_path=base_path,
                                spark=spark_session)
        meta_df = union_dataframes(dfs)
        
        meta_df.show()


In [10]:

json = 'de-final-project-388703-0bbc1822d023.json'
meta_df = data_pipeline(json)


                                                                                

+----+-----------+--------------------+----------+-------------+----------------+-----------------+-------------+------------------+----------------+
|year|      state|                city|population|   median_age|household_income|per_capita_income|poverty_count|unemployment_count|employment_count|
+----+-----------+--------------------+----------+-------------+----------------+-----------------+-------------+------------------+----------------+
|2020| California|     Home Garden CDP|    1590.0|         26.3|         35197.0|          13888.0|        455.0|              88.0|           486.0|
|2020| California|    Home Gardens CDP|   12027.0|         34.1|         67716.0|          23281.0|       1759.0|             244.0|          5321.0|
|2020| California|        Homeland CDP|    7613.0|         33.9|         53008.0|          19935.0|        831.0|             324.0|          3130.0|
|2020| California|Homestead Valley CDP|    2619.0|         55.4|         33993.0|          21077.0| 

In [None]:
# df = spark.read.parquet('gs://de_final_project_bucket/data/demographic/city/*_Florida_city_demographic_data.parquet')

In [None]:
column_count = len(meta_df.columns)
print('Column Count:', column_count)

In [None]:
row_count = meta_df.count()
print('Row Count:', row_count)