<a href="https://colab.research.google.com/github/LuisFelipeUrena/supreme-fiesta/blob/main/EVPipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install snowflake-connector-python
!pip install "snowflake-connector-python[pandas]"
!pip install pyspark




In [2]:
import requests
from io import BytesIO

def download_file_in_memory(url):
    response = requests.get(url, stream=True)
    file_obj = BytesIO()

    for chunk in response.iter_content(chunk_size=8192):
        file_obj.write(chunk)

    file_obj.seek(0)  # Rewind the file object to the beginning
    return file_obj


In [19]:
import json
url = 'https://grnhse-use1-prod-s2-ghr.s3.amazonaws.com/temp_uploads/data/214/922/430/original/ElectricVehiclePopulationData.json?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVQGOLGY36AIC4MO3%2F20240815%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240815T223928Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=2951df645023fcfbcb0f93ca3ef9fe73a87257b32e48d34d00d7c6d74fc05725'
file_obj = download_file_in_memory(url)
data = json.load(file_obj)
assert data is not None, "something is wrong"

In [4]:
data['data'][0]

['row-zt4k~iszy.uhv6',
 '00000000-0000-0000-62B4-C1BC527B773A',
 0,
 1676414233,
 None,
 1676414284,
 None,
 '{ }',
 '5YJ3E1EA8J',
 'San Diego',
 'Oceanside',
 'CA',
 '92051',
 '2018',
 'TESLA',
 'MODEL 3',
 'Battery Electric Vehicle (BEV)',
 'Clean Alternative Fuel Vehicle Eligible',
 '215',
 '0',
 None,
 '153998050',
 None,
 None,
 '06073018509',
 None,
 None,
 None]

In [20]:
from pyspark.sql import Row
from pyspark.sql import SparkSession
import re
def rename_column(column_name: str) -> str:
    """
    Renames a column name to be Snowflake-friendly.

    Snowflake column names:
    - Must start with a letter or underscore
    - Can contain letters, digits, or underscores
    - Cannot contain spaces or special characters (except for underscores)
    """
    # Replace spaces with underscores
    column_name = column_name.replace(" ", "_").lower()

    # Remove special characters (except underscores)
    column_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name)

    # Ensure the column name starts with a letter or underscore
    if column_name[0].isdigit():
        column_name = f'__{column_name}'

    return column_name

def extract_and_clean_column_names(data):
    # Extracting the list of columns from the 'data' dictionary
    columns = data['meta']['view']['columns']

    # Create a Spark session
    spark = SparkSession.builder.appName("ColumnExtractor").getOrCreate()

    # Convert the list of columns into an RDD of Rows
    rdd = spark.sparkContext.parallelize(columns)

    # Map each Row to extract and clean 'name' and 'renderTypeName', with default values
    column_names_rdd = rdd.map(lambda x: (
        rename_column(x.get('name', '')),
        'NUMBER' if x.get('renderTypeName', '') == 'number' else 'TEXT'
    ))

    # Collect the results back to a list if needed
    cleaned_column_names = column_names_rdd.collect()

    return cleaned_column_names

In [21]:
data['meta']['view']['columns'][0:2]

[{'id': -1,
  'name': 'sid',
  'dataTypeName': 'meta_data',
  'fieldName': ':sid',
  'position': 0,
  'renderTypeName': 'meta_data',
  'format': {},
  'flags': ['hidden']},
 {'id': -1,
  'name': 'id',
  'dataTypeName': 'meta_data',
  'fieldName': ':id',
  'position': 0,
  'renderTypeName': 'meta_data',
  'format': {},
  'flags': ['hidden']}]

In [22]:
cols = extract_and_clean_column_names(data)
cols

[('sid', 'TEXT'),
 ('id', 'TEXT'),
 ('position', 'TEXT'),
 ('created_at', 'TEXT'),
 ('created_meta', 'TEXT'),
 ('updated_at', 'TEXT'),
 ('updated_meta', 'TEXT'),
 ('meta', 'TEXT'),
 ('vin_110', 'TEXT'),
 ('county', 'TEXT'),
 ('city', 'TEXT'),
 ('state', 'TEXT'),
 ('postal_code', 'TEXT'),
 ('model_year', 'TEXT'),
 ('make', 'TEXT'),
 ('model', 'TEXT'),
 ('electric_vehicle_type', 'TEXT'),
 ('clean_alternative_fuel_vehicle_cafv_eligibility', 'TEXT'),
 ('electric_range', 'NUMBER'),
 ('base_msrp', 'NUMBER'),
 ('legislative_district', 'NUMBER'),
 ('dol_vehicle_id', 'TEXT'),
 ('vehicle_location', 'TEXT'),
 ('electric_utility', 'TEXT'),
 ('__2020_census_tract', 'TEXT'),
 ('counties', 'NUMBER'),
 ('congressional_districts', 'NUMBER'),
 ('waofm__gis__legislative_district_boundary', 'NUMBER')]

In [23]:
type(cols)

list

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col

def create_electric_car_objects(data, cols):
    # Create a Spark session
    spark = SparkSession.builder.appName("ElectricCarObjects").getOrCreate()

    # Define the schema based on the column names and their types
    schema = StructType([StructField(col[0], StringType(), True) for col in cols])
    # Create a DataFrame from the list of lists
    df = spark.createDataFrame(data, schema)
    # Convert the number type column to integers in spark
    for col_name, col_type in cols:
      if col_type == 'NUMBER':
          df = df.withColumn(col_name, col(col_name).cast('integer'))
    # convert back to == pandas
    df = df.toPandas()

    return df


In [25]:
df = create_electric_car_objects(data['data'], cols)
df.head(5)

Unnamed: 0,sid,id,position,created_at,created_meta,updated_at,updated_meta,meta,vin_110,county,...,electric_range,base_msrp,legislative_district,dol_vehicle_id,vehicle_location,electric_utility,__2020_census_tract,counties,congressional_districts,waofm__gis__legislative_district_boundary
0,row-zt4k~iszy.uhv6,00000000-0000-0000-62B4-C1BC527B773A,0,1676414233,,1676414284,,{ },5YJ3E1EA8J,San Diego,...,215,0,,153998050,,,6073018509,,,
1,row-5r58~kb8y.789r,00000000-0000-0000-B54E-F27AFFF902F6,0,1676414233,,1676414284,,{ },3FA6P0PU7H,Sedgwick,...,21,0,,138214331,POINT (-97.27013 37.54531),,20173009801,1291.0,,
2,row-84ix~3wif_u9ju,00000000-0000-0000-F67B-BBFF22B88E48,0,1676414233,,1676414298,,{ },1N4AZ0CP8D,Snohomish,...,75,0,38.0,3129059,POINT (-122.19388 48.15353),PUGET SOUND ENERGY INC,53061052805,3213.0,2.0,40.0
3,row-wiar-siae_sed9,00000000-0000-0000-0360-775CFE2EDAFF,0,1676414233,,1676414298,,{ },WBY8P8C58K,Kitsap,...,126,0,26.0,166525635,POINT (-122.62749 47.565),PUGET SOUND ENERGY INC,53035080500,848.0,6.0,33.0
4,row-abd5~finn.nzkg,00000000-0000-0000-3182-A2040CC92549,0,1676414233,,1676414298,,{ },5YJ3E1EA7K,Snohomish,...,220,0,32.0,475248315,POINT (-122.31768 47.87166),PUGET SOUND ENERGY INC,53061050800,3213.0,2.0,7.0


In [27]:
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd

def write_df_to_snowflake(df: pd.DataFrame, table_name: str, schema_name: str, database_name: str,
                          user: str, password: str, account: str, warehouse: str, role: str):
    # Establish the connection to Snowflake
    conn = snowflake.connector.connect(
        user=user,
        password=password,
        account=account,
        warehouse=warehouse,
        role=role,
        database=database_name,
        schema=schema_name
    )
    # Rename the columns (just in case)
    df.columns = [rename_column(col) for col in df.columns]

    try:
        # Write the DataFrame to the Snowflake table
        success, nchunks, nrows, _ = write_pandas(conn, df, table_name.upper(), quote_identifiers=True,auto_create_table=True, overwrite=True)

        if success:
            print(f"Successfully loaded {nrows} rows into table {table_name}.")
        else:
            print(f"Failed to load the DataFrame into table {table_name}.")

    finally:
        # Close the connection
        conn.close()

In [28]:
from google.colab import userdata
user = userdata.get('user')
password = userdata.get('pswd')
account = userdata.get('account')
warehouse = "COMPUTE_WH"
role = userdata.get('role')
database_name = "ELECTRICVEHICLES"
schema_name = "STAGING"
table_name = "ev_population_stg"

In [17]:
write_df_to_snowflake(
    df=df,
    table_name=table_name,
    schema_name=schema_name,
    database_name=database_name,
    user=user,
    password=password,
    account=account,
    warehouse=warehouse,
    role=role
)

Successfully loaded 22183 rows into table ev_population_stg.


In [18]:
!pip list

Package                          Version
-------------------------------- ---------------------
absl-py                          1.4.0
accelerate                       0.32.1
aiohappyeyeballs                 2.3.5
aiohttp                          3.10.3
aiosignal                        1.3.1
alabaster                        0.7.16
albucore                         0.0.13
albumentations                   1.4.13
altair                           4.2.2
annotated-types                  0.7.0
anyio                            3.7.1
argon2-cffi                      23.1.0
argon2-cffi-bindings             21.2.0
array_record                     0.5.1
arviz                            0.18.0
asn1crypto                       1.5.1
astropy                          6.1.2
astropy-iers-data                0.2024.8.12.0.32.58
astunparse                       1.6.3
async-timeout                    4.0.3
atpublic                         4.1.0
attrs                            24.2.0
audioread              