In [6]:
from bs4 import BeautifulSoup
import requests
from io import BytesIO
from urllib.request import urlopen
from zipfile import ZipFile
import pandas as pd
import os
import psycopg2
from sqlalchemy import create_engine
import string
import time

In [2]:
configs = {
    "url": 'https://www.nhtsa.gov/file-downloads?p=nhtsa/downloads/FARS/',   #Download URL
    "start_year": 2000,                    #first year  for which data should be downloaded
    "national": False,                     #include the national data
    "puerto_rico": False,                  #include the data from Puerto Rico
    "auxiliary": False,                    #include the auxiliary data
    "data_path": './data/',                #path where the data is stored 
    "write_all": False,                    #determines if all tables are to be writen
    "selected_tables": ['vehicle.csv'],    #list of tabels to write, needs to have .csv ending
    "delete_existing":True,                #determines if writing a table should overwrite existing table with same name
    "start_year_writing": 2000             #the year which si the fiirst, from which data is written
}

In [3]:
site = requests.get(configs['url']).text
soup = BeautifulSoup(site,'html.parser')
hyperlinks = soup.find("tbody").findChildren("a")

In [4]:
hyperlink_list = []
#this for loop finds all years, for which data is available
for i in hyperlinks:
    b = i['href'][38:42]
    if b.isnumeric() and int(b)>= configs['start_year']:
        hyperlink_list.append(b)
        
hyperlink_list = list(dict.fromkeys(hyperlink_list)) # this is for avoiding duplicate years    
for i in range(0,len(hyperlink_list)):
    hyperlink_list[i] = configs['url'] + hyperlink_list[i]+"/"
# after this for loop there the hyperlink_list contains links to the download area for every year since 1975 

In [5]:
#replaces has sturcture (table,columns,years,to_replace,replace)
# table specifies the table to alter
# columns specifies the columns to alter as an iterable, if it is just one column, iterable of length one
# years is an iterable containing all the years where the alteration is to be applied
# to_replace is the value to be replaced
# replace is the replacing value
replaces=[
    ('vehicle',['trav_sp'],[2000,2001,2002,2003,2004,2005,2006,2007,2008],98,998),
    ('vehicle',['trav_sp'],[2000,2001,2002,2003,2004,2005,2006,2007,2008],99,999)
]


In [6]:
#modifications has structure (table,columns,years,alteration,upper_bound)
# table specifies the table to alter
# columns specifies the columns to alter
# years is an iterable containing all the years where the alteration is to be applied
# alteration specifies the alterations which shall take place
    # must be cointaining x in the mathematical expression i.e. inch to cm  "x * 2.56"
    # must be a string --> within "" or ''
# upper_bound specifies the upper bound with numbers larger than the upper bound not altered, to avoid changing i.e. codes for unknown values

modifications = [
    ('vehicle',['trav_sp'],[0],'round(x*1.609,0)',250)
]

In [24]:
#deletes has structure(table,columns,years,lower_bound,upper_bound,replace)
# table specifies the table to alter
# columns specifies the columns to alter
# years is an iterable containing all the years where the alteration is to be applied
# lower_bound specifies the lower bound of the range of values which is replaced, the bound is also replaced
# upper_bound specifies the upper bound of the range of values which is replaced, the bound is also replaced
# replace specifies the replacing value, None if no numerical value is wanted
deletes = [
    ('vehicle',['dr_hgt'],[0],90,107,None)
]

In [7]:
def download(link_list,path):
    #function expects a list containing the hyperlinks in string format
    #path is the path where the data from the corresponding link list should be saved, should be string format
    year_list = list(range(configs['start_year'],len(link_list)+configs['start_year'],1))
    for i in range(0,len(link_list)):
        with urlopen(link_list[i]) as zipresp:
            with ZipFile(BytesIO(zipresp.read())) as zfile:
                zfile.extractall(path+str(year_list[i]).lower())

In [8]:
#this downloads all the files according to the configs dictionary
#links to the zip-files are constructed based on the hyperlink_list
#the links to the zip-files are discarded after downloading all files of one group
temp_list = []
if configs['national']:
    for i in hyperlink_list:
        temp_list.append((i+'National/'+'FARS'+i[-5:-1]+'NationalCSV.zip').replace('www','static').replace('file-downloads?p=',''))
    download(temp_list,"data/standard/national/")

temp_list = []
if configs['puerto_rico']:
    for i in hyperlink_list:
        temp_list.append((i+'Puerto%20Rico/'+'FARS'+i[-5:-1]+'PuertoRicoCSV.zip').replace('www','static').replace('file-downloads?p=',''))
    download(temp_list,"data/standard/puerto_rico/")

temp_list = []
if configs['auxiliary']:
    for i in hyperlink_list:
        temp_list.append((i+'National/'+'FARS'+i[-5:-1]+'NationalAuxiliaryCSV.zip').replace('www','static').replace('file-downloads?p=',''))
    download(temp_list,"data/auxiliary/national/")

temp_list = []
if configs['auxiliary'] and configs['puerto_rico']:
    for i in hyperlink_list:
        temp_list.append((i+'Puerto%20Rico/'+'FARS'+i[-5:-1]+'PuertoRicoAuxiliaryCSV.zip').replace('www','static').replace('file-downloads?p=',''))
    download(temp_list,"data/auxiliary/puerto_rico/")  

In [9]:
#this function adds the data to the database, frist trying to append the data to an existing table
#if the table does not exist a new table will be created
def add_to_database(dataframe,table,con_engine):
    try:
        #this will fail if there is a new column
        dataframe.to_sql(name=table, con=con_engine, if_exists = 'append', index=False)
    except:
        #first all data from the table is querried out of the database, then the concatenated data is writen 
        #into the database, overwriting any existing table with the name
        data = pd.read_sql('SELECT * FROM '+table, con_engine)
        df2 = pd.concat([data,dataframe])
        df2.to_sql(name=table, con=con_engine, if_exists = 'replace', index=False)#,method = 'psql_insert_copy')

In [10]:
def get_filepaths():
    #generates a list of paths to all files inside the data directory
    filepaths = []
    for root,dirs,files in os.walk(configs['data_path']):
        for i in files:
            filepaths.append(os.path.join(root,i))
    return filepaths

In [11]:
def timestamp(row):
    #generates a unix timestamp, works for the accidents table
    if row['day'] <31:
        day = row['day']
    else:
        day = 1
    if row['hour'] <24:
        hour = row['hour']
    else:
        hour = 0
    if row['minute'] <60:
        minute = row ['minute']
    else: 
        minute = 0    
    return pd.Timestamp(row['year'],row['month'],day,hour,minute)

In [12]:
def primarykey(pk_df,information):
    #this function creates primarykeys which are truly unique across all datarows, as the unique identifiers
    #in the underlying data are reused every year
    
    if information[1] == 'accident':
        pk_df['TIMESTAMP']= pk_df.apply(timestamp,axis = 1)
        #this adds a new column with a timestamp to the accidents table/dataframe
        
    if 'st_case' in pk_df.columns:
        front_part = information[0]*1000000
        pk_df['primary_key_case'] = pk_df['st_case']+front_part
        pk_df = change_col_position(['primary_key_case'],0,pk_df)
        #ST_CASE is the unique identifier of the accidents table, it is a 5 to 6 digits integer number
        #it is added to a 10 digit integer number with 6 trailing zeroes and the front being the year 
        #in which the data was gathered

    if 'veh_no' in pk_df.columns and 'per_no' in pk_df.columns:
        pk_df = pk_df.assign(uni_id = lambda x : (x['primary_key_case']*1000000 + x['veh_no']*1000 + x['per_no'])) 
    elif 'veh_no' in pk_df.columns:
        pk_df = pk_df.assign(uni_id = lambda x : (x['primary_key_case']*1000000 + x['veh_no']*1000)) 
    elif 'per_no' in pk_df.columns:
        pk_df = pk_df.assign(uni_id = lambda x : (x['primary_key_case']*1000000 + x['veh_no']*1000 + x['per_no'])) 
        
        
        
    

    return pk_df

In [13]:
def change_col_position(colnames,position,dataframe):
    #this function changes the position of a dataframe column
    #expects an iterable for colnames, an integer for the position in the dataframe and a dataframe to be changed
    collist = list(dataframe.columns)
    for i in colnames: collist.remove(i)
    for i in reversed(colnames): collist.insert(position,i)
    return dataframe[collist]


In [14]:
def delete_table(information,engine):
    try:
        sql = 'DROP TABLE IF EXISTS ' + information[1]
        engine.execute(sql)
        print('table '+information[1] + ' deleted')
    except:
        print('table '+information[1] + ' doesnt exist')

In [None]:
paths = get_filepaths() #paths is a list of paths for every file in every directory in the data directory
name_set = set()        #name_set is a set containing every unique filename   
engine = create_engine("postgresql+psycopg2://postgres:admin@localhost/NHTSA_FARS_NATIONAL") 
# engine for database connection

In [14]:


for i in paths:
    #adds all files to a set, also takes the different capitalizations of the files into account
    name_set.add(i.split('\\')[-1].lower())
  
    
name_list = list(name_set)
name_list.sort()
#print(name_list)

if not configs['write_all']:
    name_list = configs['selected_tables']




for i in name_list:
    

    
    same_table_path_list = [x for x in paths if '\\'+i.lower() in x.lower()]
    #print(same_table_path_list)
    #list comprehension to find the paths to all csv files for the elements in the name_list
    # the double backslash is important, to avoid confusion between certain tables having similar endings
    frame_list = []
    for j in same_table_path_list:
        information = []
        split_str = j.split('/')[-1].split("\\")
        information.append(int(split_str[-2]))
        information.append(split_str[-1].split(".")[0].lower())
        print(information)
        #information is a list of 2 elements, first is the year of the dataframe/file, second is the name
        
        df_each_year = pd.read_csv(j,low_memory = False,encoding='latin_1')
        #low_memory = True leads to a warning, to avoid that, it is set to false, workaround is pending
        
        df_each_year.columns = df_each_year.columns.str.lower()
        #column names are not consistent in capitlization across the different years, fixed here
        
        #goes through the replace list to find all replace instructions for the specific data of the dataframe
        for k in replaces:
            if information[1] == k[0] and (information[0] in k[2] or k[2][0]==0) and k[1]:
                for l in k[1]:
                    df_each_year[l]= df_each_year[l].replace(k[3],k[4])
            
        #
        for k in deletes:
            if information[1] == k[0] and (information[0] in k[2] or k[2][0]==0):
                for l in k[1]:
                    df_each_year[l] = df_each_year[l].apply(lambda x: x if x<k[3] or x>k[4] else k[5])
    
    
    
        #goes through the modifications list and applys those to the corresponding tables in the corresponding years
        for k in modifications:
            if information[1] == k[0] and (information[0] in k[2] or k[2][0]==0):
                #lambda_func = eval('lambda x: '+k[3]+'if x <'+ str(k[4])+'else None')
                for l in k[1]:
                    df_each_year[l] = df_each_year[l].apply(func = (lambda x:eval(k[3]+'if x <'+ str(k[4])+'else x')))
                    
        

        
        df_each_year = primarykey(df_each_year,information)
        #the primarykey function is applied to every file individually
        #this allows to replace the st_case key, which is not individual across years with primary_key_case
        
        
        
        if information[0] >=configs['start_year_writing']:
            frame_list.append(df_each_year)

    df = pd.concat(frame_list) #create one big dataframe out of the different small ones
    
    table_name = i.split('.')[0]# discards the csv ending
    
    if configs['delete_existing']:
        delete_table(information,engine)


    

    max_df_elements = 80000000    #number will need adaptation for different machines, determines the size
                                  #after which subsets of the dataframe are used for database insertion
    
    if df.shape[0]*df.shape[1]>max_df_elements :                   
        print(df.shape[0])                                 
        
        chunk_size = int(max_df_elements / -530) 
        #chunk size is tested with a 32GB RAM Windows System, adaptations might be needed
        #needs to be negative to generate the correct number in the range of the for-loop

        
        #this start at the back of the table/with the most recent data, to have values in as many columns as possible
        #due to a drastic reduction in exection time compared to chronological inserts in the pandas to_sql function
        for end in range(df.shape[0],0, chunk_size):
            
            start = end + chunk_size
            if start < 0: #check for the start to avoid error in the iloc function
                start = 0
                
            df_subset = df.iloc[start:end]
            add_to_database(df_subset,information[1],engine)

            del df_subset

    else:
        #this is for small and medium sized tabels
        add_to_database(df,information[1],engine)


NameError: name 'paths' is not defined

In [15]:
if 2 in range (1,10,1):
    print(1)

1
