In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import psycopg2
import findspark
findspark.init()
findspark.find()

'/home/jwang/workspace/national_energy_impact/.venv/lib/python3.8/site-packages/pyspark'

In [2]:
spark = SparkSession.builder \
        .appName("Python Spark SQL basic example") \
        .config('spark.jars', '/home/jwang/workspace/postgresql_jar/postgresql-42.6.0.jar') \
        .getOrCreate()

23/08/01 12:14:21 WARN Utils: Your hostname, jwang resolves to a loopback address: 127.0.1.1; using 192.168.2.168 instead (on interface wlp0s20f3)
23/08/01 12:14:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/08/01 12:14:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
def db_connect(host, database, user, password, port, schema, table_name):
    # prepare for saving data to postgresql
    # connect to postgresql database, create a schema if not exist, and clear old tables
    conn_string = 'postgresql://{0}:{1}@{2}:{3}/{4}'.format(user, password, host, port, database)
    pg_conn = psycopg2.connect(conn_string)
    cur = pg_conn.cursor()  
    cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
    cur.execute(f"DROP TABLE IF EXISTS {schema}.{table_name}")
    pg_conn.commit()
    pg_conn.close()


def process_station_data_and_rename_columns(drop_cols, file_name):
    # station data from year of 2022 has mismatched column headers
    # will read and rename the headers to the correct ones, and drop unwanted columns
    station_df = spark.read.csv(file_name, sep='|', header=True)
    station_df = station_df.drop(*drop_cols)
    station = station_df.withColumnRenamed('Travel_Lane', 'Year_Record1')\
                       .withColumnRenamed('Travel_Dir', 'Travel_Lane')\
                       .withColumnRenamed('Station_Id', 'Travel_Dir')\
                       .withColumnRenamed('State_Code', 'Station_Id')\
                       .withColumnRenamed('Year_Record', 'State_Code')\
                       .withColumnRenamed('Year_Record1', 'Year_Record')
    return station 

def process_station_data(drop_cols, file_name):
    # read and drop unwanted columns
    station_df = spark.read.csv(file_name, sep='|', header=True)
    station = station_df.drop(*drop_cols)
    return station 


def load_data_to_postgres(df, schema, table_name, host, database, user, password, port):
    # save data frames into postgresql
    dbtable = f"{schema}.{table_name}"
    url = "jdbc:postgresql://{0}:{1}/{2}".format(host, port, database)
    df.write.format("jdbc")\
        .option('driver', 'org.postgresql.Driver') \
        .option("url", url).option('dbtable', dbtable) \
        .option("user", user).option("password", password).save()
    print("Add to postgresql: " + dbtable)


In [4]:
def process_station_data_without_seperator(drop_cols, file_name, station_data_size):
    # read station data from files that are not seperated by , or |
    # and drop unwanted columns
    sample_file_for_station_schema = "../HPMS_Data_Unified_Name/HPMS_Data_Unified_Name/2021/2021 (TMAS).STA"
    df = spark.read.csv(sample_file_for_station_schema, sep='|', header=True)
    columns = df.schema
    file = open(file_name, 'r', errors='ignore')
    lines = file.readlines()
    all_data = []
    for line in lines:
        new_row = []
        idx = 0
        for station_data in station_data_size:
            if station_data[1] > 0:
                new_row.append(line[idx: idx+station_data[1]])
                idx += station_data[1]
            else: # the last column 'Station_Location' has variable size
                new_row.append(line[idx:-1])
        all_data.append(new_row)
    file.close()
    station_df = spark.createDataFrame(data=all_data, schema=columns)
    station = station_df.drop(*drop_cols)
    return station 

In [5]:
def main():
    host = 'localhost'
    database = 'national_volume_osm'
    user = 'postgres'
    password = 'wjh00813'
    port = 5432
    drop_cols = ('Sample_Type_Volume', 'Num_Lanes_Volume', 'Method_Volume', 'Sample_Type_Class',\
                'Num_Lanes_Class', 'Method_Class', 'Algorithm_Volume', 'Sample_Type_Truck', \
                'Num_Lanes_Truck', 'Method_Truck', 'Data_Retrieval', 'Primary_Purpose', \
                'LRS_Id', 'LRS_Point', 'SHRP_Id', 'Is_Sample', 'Sample_Id', 'Con_Route_Signing', \
                    'Con_Signed_Route')

    states_dict = {
            'AK': 'Alaska',
            'AL': 'Alabama',
            'AR': 'Arkansas',
            'AZ': 'Arizona',
            'CA': 'California',
            'CO': 'Colorado',
            'CT': 'Connecticut',
            'DC': 'District of Columbia',
            'DE': 'Delaware',
            'FL': 'Florida',
            'GA': 'Georgia',
            'HI': 'Hawaii',
            'IA': 'Iowa',
            'ID': 'Idaho',
            'IL': 'Illinois',
            'IN': 'Indiana',
            'KS': 'Kansas',
            'KY': 'Kentucky',
            'LA': 'Louisiana',
            'MA': 'Massachusetts',
            'MD': 'Maryland',
            'ME': 'Maine',
            'MI': 'Michigan',
            'MN': 'Minnesota',
            'MO': 'Missouri',
            'MS': 'Mississippi',
            'MT': 'Montana',
            'NC': 'North Carolina',
            'ND': 'North Dakota',
            'NE': 'Nebraska',
            'NH': 'New Hampshire',
            'NJ': 'New Jersey',
            'NM': 'New Mexico',
            'NV': 'Nevada',
            'NY': 'New York',
            'OH': 'Ohio',
            'OK': 'Oklahoma',
            'OR': 'Oregon',
            'PA': 'Pennsylvania',
            'RI': 'Rhode Island',
            'SC': 'South Carolina',
            'SD': 'South Dakota',
            'TN': 'Tennessee',
            'TX': 'Texas',
            'UT': 'Utah',
            'VA': 'Virginia',
            'VT': 'Vermont',
            'WA': 'Washington',
            'WI': 'Wisconsin',
            'WV': 'West Virginia',
            'WY': 'Wyoming'
    }
    station_data_size = (
        ('Record_Type',1),
        ('State_Code',2),
        ('Station_Id',6),
        ('Travel_Dir',1),
        ('Travel_Lane',1),
        ('Year_Record',2),
        ('F_System',2),
        ('Num_Lanes',1),
        ('Sample_Type_Volume',1),
        ('Num_Lanes_Volume',1),
        ('Method_Volume',1),
        ('Sample_Type_Class',1),
        ('Num_Lanes_Class',1),
        ('Method_Class',1),
        ('Algorithm_Volume',1),
        ('Num_Classes',2),
        ('Sample_Type_Truck',1),
        ('Num_Lanes_Truck',1),
        ('Method_Truck',1),
        ('Calibration',1),
        ('Data_Retrieval',1),
        ('Type_Sensor_1',1),
        ('Type_Sensor_2',1),
        ('Primary_Purpose',1),
        ('LRS_Id',12),
        ('LRS_Point',6),
        ('Latitude',8),
        ('Longitude',9),
        ('SHRP_Id',4),
        ('Prev_Station_Id',6),
        ('Year_Established',2),
        ('Year_Discontinued',2),
        ('County_Code',3),
        ('Is_Sample',1),
        ('Sample_Id',12),
        ('NHS',1),
        ('Posted_Route_Signing',1),
        ('Posted_Signed_Route',8),
        ('Con_Route_Signing',1),
        ('Con_Signed_Route',8),
        ('Station_Location',0)
    )
    states = list(states_dict.keys())
    months = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
    schema_sta = 'station'
    state_code = spark.read.csv('state_code.csv', header=True)
    years = [i for i in range(2022, 2010, -1)]

    for year in years:
        if year in [2022, 2020, 2019, 2018]:
            # only for 2022, 2020, 2019, 2018
            sta_file_names = ["../HPMS_Data_Unified_Name/HPMS_Data_Unified_Name/{0}/{0}_station_data/{1}_{0} (TMAS).STA".format(year, state)\
                            for state in states]
            for file_name in sta_file_names:
                table_name = file_name.split('/')[5].split('.')[0].split(' ')[0]
                db_connect(host, database, user, password, port, schema_sta, table_name)
                if year == 2022:
                    station = process_station_data_and_rename_columns(drop_cols, file_name)
                else: # in [2020, 2019, 2018]
                    station = process_station_data_without_seperator(drop_cols, file_name, station_data_size)
                load_data_to_postgres(station, schema_sta, table_name, host, database, user, password, port)
        else:
            # only for 2021, 2017, 2016, 2015, 2014, 2013, 2012, 2011
            sta_file_names = ["../HPMS_Data_Unified_Name/HPMS_Data_Unified_Name/{0}/{0} (TMAS).STA".format(year)]
            for file_name in sta_file_names:
                if year == 2021:
                    all_station = process_station_data(drop_cols, file_name)
                else: # in [2017, 2016, 2015, 2014, 2013, 2012, 2011]
                    all_station = process_station_data_without_seperator(drop_cols, file_name, station_data_size)
                for state in states:
                    table_name = state + '_' + file_name.split('/')[4].split('.')[0].split(' ')[0]
                    db_connect(host, database, user, password, port, schema_sta, table_name)
                    code = state_code.filter(state_code.state == states_dict[state]).select('code').collect()[0][0]
                    station = all_station.filter(all_station.State_Code == code)
                    load_data_to_postgres(station, schema_sta, table_name, host, database, user, password, port)


In [6]:
if __name__ == "__main__":
    main()

23/08/01 12:14:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Add to postgresql: station.AK_2022
Add to postgresql: station.AL_2022
Add to postgresql: station.AR_2022
Add to postgresql: station.AZ_2022
Add to postgresql: station.CA_2022
Add to postgresql: station.CO_2022
Add to postgresql: station.CT_2022
Add to postgresql: station.DC_2022
Add to postgresql: station.DE_2022
Add to postgresql: station.FL_2022
Add to postgresql: station.GA_2022
Add to postgresql: station.HI_2022
Add to postgresql: station.IA_2022
Add to postgresql: station.ID_2022
Add to postgresql: station.IL_2022
Add to postgresql: station.IN_2022
Add to postgresql: station.KS_2022
Add to postgresql: station.KY_2022
Add to postgresql: station.LA_2022
Add to postgresql: station.MA_2022
Add to postgresql: station.MD_2022
Add to postgresql: station.ME_2022
Add to postgresql: station.MI_2022
Add to postgresql: station.MN_2022
Add to postgresql: station.MO_2022
Add to postgresql: station.MS_2022
Add to postgresql: station.MT_2022
Add to postgresql: station.NC_2022
Add to postgresql: s