In [3]:
# Create Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Read Voter File Data") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/17 23:25:52 INFO SparkEnv: Registering MapOutputTracker
24/03/17 23:25:52 INFO SparkEnv: Registering BlockManagerMaster
24/03/17 23:25:52 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/03/17 23:25:52 INFO SparkEnv: Registering OutputCommitCoordinator


In [5]:
""" Use these functions to [read/write] [parquet files/data frames] [from/to] Google Cloud"""

from pyspark.sql import SparkSession

def read_parquet_file(spark_session, file_path):
    """
    Reads a Parquet file and returns a Spark DataFrame.
    
    Parameters:
    spark_session (SparkSession): An active SparkSession.
    file_path (str): The path to the Parquet file.

    Returns:
    DataFrame: A Spark DataFrame containing the data from the Parquet file.
    """

    # Read the Parquet file
    df = (
        spark_session.read
        .format("parquet")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(file_path)
    )

    return df


def write_df_to_gcs_parquet(spark_session, df, bucket_name, file_path):
    """
    Writes a Spark DataFrame to a Google Cloud Storage bucket as a Parquet file.
    
    Parameters:
    spark_session (SparkSession): An active SparkSession.
    df (DataFrame): The Spark DataFrame to write.
    bucket_name (str): The name of the Google Cloud Storage bucket.
    file_path (str): The file path within the bucket where the Parquet file will be saved.
    """
    
    # Define the GCS path
    gcs_path = f"gs://{bucket_name}/data_cleaned/{file_path}"

    # Write the DataFrame as a Parquet file to GCS
    df.write.parquet(gcs_path, mode='overwrite')
    

def list_content(bucket_name):
    """
    Lists all the blobs in the bucket.
    
    Parameters: 
    bucket_name (str): The name of the Google Cloud Storage bucket. 
    """
    storage_client = storage.Client()
    content = storage_client.list_blobs(bucket_name)
    
    for file in content:
        print(file.name)

In [None]:
# global variables 
class_bucket_name = "winter-2024-voter-file"
local_bucket_name = "pstat135-final-project1"
class_folder_name = "VM2Uniform"  

# Use the sample_parquet files and the corresponding dataset to test your code before using the actual dataset
sample_parquet_files = ['VM2Uniform--AK--2021-02-03', 'VM2Uniform--AL--2021-02-04', 'VM2Uniform--AR--2021-03-16']

In [None]:
from google.cloud import storage

def list_filtered_blobs(bucket_name, folder_name):
    """Lists all the blobs in a specific folder of the bucket that end with '_SUCCESS'."""
    storage_client = storage.Client()
    prefix = folder_name + '/'
    filtered_blobs = []

    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)
    for blob in blobs:
        if blob.name.endswith("_SUCCESS"):
            # Remove '_SUCCESS' from the file name
            replaced_name = blob.name.replace('_SUCCESS', '')
            
            split_replaced_name = replaced_name.split('/')
            
            cleaned_name = split_replaced_name[1]
            
            if cleaned_name not in filtered_blobs:
                filtered_blobs.append(cleaned_name)

    return filtered_blobs

def aggregate_dataset(files):
    dataframes = []

    for file in files:
        df = read_parquet_file(spark, f"gs://winter-2024-voter-file/VM2Uniform/{file}")
        subset_df = df.filter((df.Voters_Age >= 18) & (df.Voters_Age <= 29))
        sampled_df = subset_df.sample(False, 0.1)  # False for no replacement, 0.1 for 10%
        dataframes.append(sampled_df)

    aggregate_df = dataframes[0]
    for dataframe in dataframes[1:]:
        aggregate_df = aggregate_df.union(dataframe)
        
    return aggregate_df

In [None]:
success_files = list_filtered_blobs(class_bucket_name, class_folder_name)
states = [file.split('--')[1] for file in success_files]

In [None]:
print(success_files)

In [None]:
print(states)

In [None]:
combined_df = aggregate_dataset(success_files)

In [None]:
combined_df.count()

In [None]:
final_df = combined_df.select(
        "Voters_FirstName", 
        "Voters_LastName",
        "LALVOTERID",
        "Voters_Age",
        "Voters_Gender",
        "General_2020",
        "Primary_2020", 
        "Ethnic_Description",
        "PresidentialPrimary_2020",
        "EthnicGroups_EthnicGroup1Desc",
        "Voters_StateVoterID",
        "CommercialData_Education",
        "CommercialData_EstHomeValue",
        "CommercialData_EstimatedHHIncome",
)

In [None]:
write_df_to_gcs_parquet(spark, final_df, local_bucket_name, "dataset_young_demographic")

In [None]:
final_df.select("CommercialData_EstHomeValue").show(truncate=False)

In [None]:
print(type(final_df))

In [None]:
test_df = aggregate_dataset(sample_parquet_files)

In [None]:
test_df.count()

In [None]:
from pyspark.sql.functions import col, avg

numerical_df = test_df.withColumn("CommercialData_EstHomeValue_in_Dollars", col("CommercialData_EstHomeValue").cast("int")) # or "double" for floating-point numbers

numerical_df.select("CommercialData_EstHomeValue_in_Dollars").dtypes

In [None]:
test_df.select("CommercialData_EstHomeValue").dtypes

In [None]:
average_value = numerical_df.select(avg("CommercialData_EstHomeValue_in_Dollars")).first()[0]

In [None]:
print(average_value)

In [None]:
numerical_df.select("CommercialData_EstHomeValue", "CommercialData_EstHomeValue_in_Dollars").show(truncate=False)

In [None]:
from pyspark.sql.functions import regexp_replace
df_with_numerical = df.withColumn("numerical_column", regexp_replace("your_column_name", "\\$", "").cast(FloatType()))

In [None]:
from pyspark.sql.functions import regexp_replace
numerical_df_select = numerical_df.withColumn("CommercialData_EstHomeValue_in_Dollars", regexp_replace("CommercialData_EstHomeValue", "\\$", "").cast("int"))

In [None]:
numerical_df_select.select("CommercialData_EstHomeValue", "CommercialData_EstHomeValue_in_Dollars").show(truncate=False)

In [None]:
average_value = numerical_df_select.select(avg("CommercialData_EstHomeValue_in_Dollars")).first()[0]

In [None]:
print(average_value)

In [None]:
pandas_df = numerical_df_select.select("CommercialData_EstHomeValue_in_Dollars").toPandas()

In [None]:
pandas_df.median()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Assuming 'pandas_df' is your Pandas DataFrame and 'your_column_name' is the column you want to plot
plt.figure(figsize=(10, 6))
sns.histplot(pandas_df['CommercialData_EstHomeValue_in_Dollars'], kde=True, stat="percent")


plt.xlim([0, 1000000])
plt.title('Distribution of Your CommercialData_EstHomeValue_in_Dollars')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.show()

In [None]:
final_df.select("CommercialData_EstimatedHHIncome").show(truncate=False)