In [1]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine

def load_311data_from_s3():
    # select your local path of 311_Service_Requests_from_2019_to_Present.csv
    file_name = 's3://information-arch-yuehao-wang-assignment-8a/311_Service_Requests_from_2019_to_Present.csv'
    #file_name = r'D:\yeshiva\2 semester\info architecture\final/311_Service_Requests_from_2019_to_Present_10_complaint_type.csv'
    #data_sample =pd.read_csv(file_name, nrows=1000) 

    data_sample =pd.read_csv(file_name)
    
    data_sample = data_sample[(data_sample['Complaint Type'] == 'Noise - Residential') | (data_sample['Complaint Type'] == 'Noise - Street/Sidewalk') | (data_sample['Complaint Type'] == 'Noise - Vehicle')]

    print("load 311 data from s3 successfully")
    print(data_sample.shape)
    
    return data_sample

def save_into_rds_by_name(sub_df, table_name, if_exists='replace'):
    
    print(sub_df.shape)
    
    # create engine by sqlalchemy + pymysql
    # input your local mysql
    # root is your local root username
    # 12345678 is your local mysql's password
    # localhost is your local host
    # m4 is your schema
    engine = create_engine('mysql+pymysql://admin:12345678@m4-test-2.cwog1e14mewr.us-east-1.rds.amazonaws.com/m4')
    #engine = create_engine('mysql+pymysql://root:12345678@localhost/m4')
    
    # to transfer csv to mysql by pandas.to_sql
    sub_df.to_sql(con=engine, name=table_name, if_exists=if_exists, index=False, chunksize=100000)
    
    print(table_name + " save into rds successfully")

    
def save_categories_values(data_sample):
    
    save_agency(data_sample)
    
    save_complaint_type(data_sample)
    
    save_complaint_descriptor(data_sample)
    
    save_zip_code(data_sample)
    
    save_city(data_sample)
    
    save_resolution_description(data_sample)
    
    save_borough(data_sample)
    
    save_open_data_channel_type(data_sample)
    
    save_311_items(data_sample)
    
    
def save_agency(data_sample):
    sub_df = data_sample.loc[:, ['Agency', 'Agency Name']]
    
    print("load 311 agency data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['Agency', 'Agency Name']).size().reset_index(name='freq')

    sub_df.rename(columns={'Agency':'agency', 'Agency Name':'agency_name'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_agency')
    

def save_complaint_type(data_sample):
    sub_df = data_sample.loc[:, ['Complaint Type']]
    
    print("load 311 Complaint Type data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['Complaint Type']).size().reset_index(name='freq')

    sub_df.rename(columns={'Complaint Type':'complaint_type'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_complaint_type')
    

def save_complaint_descriptor(data_sample):
    sub_df = data_sample.loc[:, ['Descriptor']]
    
    print("load 311 Complaint Descriptor data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['Descriptor']).size().reset_index(name='freq')

    sub_df.rename(columns={'Descriptor':'complaint_descriptor'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_complaint_descriptor')

    
def save_zip_code(data_sample):
    sub_df = data_sample.loc[:, ['Incident Zip']]
    
    print("load 311 Incident Zip data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['Incident Zip']).size().reset_index(name='freq')

    sub_df.rename(columns={'Incident Zip':'zipcode'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_zipcode')
    
    
def save_city(data_sample):
    sub_df = data_sample.loc[:, ['City']]
    
    print("load 311 City data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['City']).size().reset_index(name='freq')

    sub_df.rename(columns={'City':'city'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_city')
    

def save_resolution_description(data_sample):
    sub_df = data_sample.loc[:, ['Resolution Description']]
    
    print("load 311 Resolution Description data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['Resolution Description']).size().reset_index(name='freq')

    sub_df.rename(columns={'Resolution Description':'resolution_description'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_resolution_description')
    

def save_borough(data_sample):
    sub_df = data_sample.loc[:, ['Borough']]
    
    print("load 311 Borough data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['Borough']).size().reset_index(name='freq')

    sub_df.rename(columns={'Borough':'borough'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_borough')


def save_open_data_channel_type(data_sample):
    sub_df = data_sample.loc[:, ['Open Data Channel Type']]
    
    print("load 311 Open Data Channel Type data successfully")
    print(sub_df.head(2))
    
    sub_df = sub_df.groupby(['Open Data Channel Type']).size().reset_index(name='freq')

    sub_df.rename(columns={'Open Data Channel Type':'open_data_channel_type'}, inplace=True)
    
    save_into_rds_by_name(sub_df, 't_311_open_data_channel_type')
    

def save_311_items(data_sample):
    sub_df = data_sample.loc[:, ['Unique Key', 'Created Date','Closed Date',
                                'Agency','Agency Name','Complaint Type',
                                'Descriptor','Incident Zip','City',
                                'Status','Resolution Description',
                                'Resolution Action Updated Date','Borough','Open Data Channel Type',
                                'Latitude','Longitude']]
    
    sub_df.rename(columns={'Unique Key':'unique_key', 'Created Date':'created_date', 'Closed Date':'closed_date',  
        'Agency':'agency', 'Agency Name':'agency_name', 'Complaint Type':'complaint_type', 
        'Descriptor':'descriptor', 'Incident Zip':'incident_zip', 'City':'city', 
        'Status':'status', 'Resolution Description':'resolution_description', 
        'Resolution Action Updated Date':'resolution_action_updated_date', 'Borough':'borough', 'Open Data Channel Type':'open_data_channel_type', 
        'Latitude':'latitude', 'Longitude':'longitude'}, inplace=True)
    
    sub_df['created_date'] = pd.to_datetime(sub_df['created_date'])
    sub_df['closed_date'] = pd.to_datetime(sub_df['closed_date'])
    sub_df['resolution_action_updated_date'] = pd.to_datetime(sub_df['resolution_action_updated_date'])
    
    #sub_df = sub_df.convert_dtypes()
    #wildfire_df.dtypes
    
    print("load 311 items successfully")
    print(sub_df.head(2))
    
    save_into_rds_by_name(sub_df, 't_311_items' , if_exists='append')
    #save_into_rds_by_name(sub_df, 't_311_items')


    
# Defining main function
def main():
    
    data_sample = load_311data_from_s3()
    
    save_categories_values(data_sample)

  
# Using the special variable 
# __name__
if __name__=="__main__":
    main()


  exec(code_obj, self.user_global_ns, self.user_ns)


load 311 data from s3 successfully
(1115850, 32)
load 311 agency data successfully
  Agency                      Agency Name
3   NYPD  New York City Police Department
4   NYPD  New York City Police Department
(2, 3)
t_311_agency save into rds successfully
load 311 Complaint Type data successfully
            Complaint Type
3  Noise - Street/Sidewalk
4  Noise - Street/Sidewalk
(3, 2)
t_311_complaint_type save into rds successfully
load 311 Complaint Descriptor data successfully
         Descriptor
3  Loud Music/Party
4  Loud Music/Party
(7, 2)
t_311_complaint_descriptor save into rds successfully
load 311 Incident Zip data successfully
   Incident Zip
3       11234.0
4       11209.0
(217, 2)
t_311_zipcode save into rds successfully
load 311 City data successfully
       City
3  BROOKLYN
4  BROOKLYN
(49, 2)
t_311_city save into rds successfully
load 311 Resolution Description data successfully
                              Resolution Description
3  The Police Department responded to the 