# Import Python Packages

In [1]:
# Import Python packages
import pandas as pd
import psycopg2

# Connection Set up 

## DB Params

#### Main DB

In [37]:
DB_ENDPOINT = "csvtoazurepostgres.postgres.database.azure.com"
DEFAULT_DB = 'postgres'
DB = 'csv_etl'
DB_USER = 'tempUser_accident_data_ETL'
DB_PASSWORD = 'A7.Kr2phuVjagj3'
DB_PORT = '5432'

#### Test Local DB

In [2]:
DB_ENDPOINT = "localhost"
DEFAULT_DB = 'postgres'
DB = 'csv_etl'
DB_USER = 'postgres'
DB_PASSWORD = '1234'
DB_PORT = '5432'

## Create a connection to Default Database

In [3]:
try: 
    conn = psycopg2.connect(
                        host=DB_ENDPOINT,
                        port=DB_PORT,
                        dbname=DEFAULT_DB,
                        user=DB_USER,
                        password=DB_PASSWORD)
    print("Connection established")
    conn.set_session(autocommit=True)
    print("Connection is in Auto Commit")
except psycopg2.Error as e: 
    print("Error: Could not make connection to the Default Postgres database")
    print(e)

Connection established
Connection is in Auto Commit


## Create Cursor

In [4]:
try: 
    cursor = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get cursor to the Database")
    print(e)

## Create New DB and reset connection 

In [5]:
query = f"DROP DATABASE IF EXISTS {DB}"
try: 
    cursor.execute(query)
    query = f"CREATE DATABASE {DB}"
    cursor.execute(query)
except psycopg2.Error as e: 
    print("Error: Issue creating DataBase")
    print (e)

### Reset connection to the Desired DB

In [6]:
cursor.close()
conn.close()
try: 
    conn = psycopg2.connect(
                        host=DB_ENDPOINT,
                        port=DB_PORT,
                        dbname=DB,
                        user=DB_USER,
                        password=DB_PASSWORD)
    print("Connection established")
    conn.set_session(autocommit=True)
    print("Connection is in Auto Commit")
except psycopg2.Error as e: 
    print(f"Error: Could not make connection to the {DB} database")
    print(e)
    
try: 
    cursor = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get cursor to the Database")
    print(e)

Connection established
Connection is in Auto Commit


# Clean DATA , DROP unconstant Data, DROP Unused DATA

In [9]:
# Show numeric output in decimal format e.g., 2.15
pd.options.display.float_format = '{:,.2f}'.format
data_frame_riders = pd.read_csv('../data/riders.csv',header=None,names=['rider_id','first','last','address','birthday','account_start_date','account_end_date','is_member'])
data_frame_payments = pd.read_csv('../data/payments.csv',header=None,names=['payment_id','date','amount','rider_id'])
data_frame_stations = pd.read_csv('../data/stations.csv',header=None,names=['station_id','name','latitude','longitude'])
data_frame_trips = pd.read_csv('../data/trips.csv',header=None,names=['trip_id','rideable_type','start_at','ended_at','start_station_id','end_station_id','rider_id'])

## Explore the Data & Check for Consistency

### Riders CSV 

In [10]:
print(data_frame_riders.shape)
print(data_frame_riders.info())
print(data_frame_riders.sample())

(75000, 8)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 75000 entries, 0 to 74999
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   rider_id            75000 non-null  int64 
 1   first               75000 non-null  object
 2   last                75000 non-null  object
 3   address             75000 non-null  object
 4   birthday            75000 non-null  object
 5   account_start_date  75000 non-null  object
 6   account_end_date    14954 non-null  object
 7   is_member           75000 non-null  bool  
dtypes: bool(1), int64(1), object(6)
memory usage: 4.1+ MB
None
       rider_id first     last                       address    birthday  \
42978     43978  Mark  Gilbert  9808 Alexis Mission Apt. 285  1984-12-11   

      account_start_date account_end_date  is_member  
42978         2017-11-13              NaN       True  


In [11]:
print(f'Missing values for data set?: {data_frame_riders.isna().values.any()}')
print(f'Missing values for data set?: {data_frame_riders.isna().values.sum()}')
print(data_frame_riders.isna)
data_frame_riders['account_end_date']  = data_frame_riders['account_end_date'] .fillna(0)
print(f'Missing values for data set?: {data_frame_riders.isna().values.any()}')
print(f'Missing values for data set?: {data_frame_riders.isna().values.sum()}')
print(data_frame_riders.isna)
#data_frame_riders.dropna(inplace=True)
# Remove duplicates
print(f'duplicates values for data set?: {data_frame_riders.duplicated().values.any()}')
print(f'duplicates values for data set?: {data_frame_riders.duplicated().values.sum()}')
#print(data_frame_riders.duplicated)  

Missing values for data set?: True
Missing values for data set?: 60046
<bound method DataFrame.isna of        rider_id       first       last                        address  \
0          1000       Diana      Clark            1200 Alyssa Squares   
1          1001    Jennifer      Smith                397 Diana Ferry   
2          1002       Karen      Smith      644 Brittany Row Apt. 097   
3          1003       Bryan    Roberts         996 Dickerson Turnpike   
4          1004       Jesse  Middleton         7009 Nathan Expressway   
...         ...         ...        ...                            ...   
74995     75995  Alexandria      Smith    8045 Amanda Union Suite 942   
74996     75996      Nicole     Watson               5255 Andrea View   
74997     75997       James   Martinez  88210 Sheryl Islands Apt. 191   
74998     75998       Nancy      Watts      0175 Mary Hollow Apt. 187   
74999     75999        Ross      Green           8086 Fernandez Drive   

         birthday ac

#### Check  data after cleanning

In [12]:
print(data_frame_riders.shape)
print(data_frame_riders.sample())

(75000, 8)
       rider_id  first   last               address    birthday  \
54101     55101  Laura  White  24512 Danielle Plaza  1990-03-08   

      account_start_date account_end_date  is_member  
54101         2018-05-11                0      False  


#### Change Columns type to the desired format

In [13]:
print(data_frame_riders.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 75000 entries, 0 to 74999
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   rider_id            75000 non-null  int64 
 1   first               75000 non-null  object
 2   last                75000 non-null  object
 3   address             75000 non-null  object
 4   birthday            75000 non-null  object
 5   account_start_date  75000 non-null  object
 6   account_end_date    75000 non-null  object
 7   is_member           75000 non-null  bool  
dtypes: bool(1), int64(1), object(6)
memory usage: 4.1+ MB
None


In [14]:
data_frame_riders['account_end_date']  = pd.to_datetime(data_frame_riders['account_end_date'])
data_frame_riders['birthday']  = pd.to_datetime(data_frame_riders['birthday'])
data_frame_riders['account_start_date']  = pd.to_datetime(data_frame_riders['account_start_date'])

#### See the new data types and if the data is ready

In [15]:
print(data_frame_riders.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 75000 entries, 0 to 74999
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   rider_id            75000 non-null  int64         
 1   first               75000 non-null  object        
 2   last                75000 non-null  object        
 3   address             75000 non-null  object        
 4   birthday            75000 non-null  datetime64[ns]
 5   account_start_date  75000 non-null  datetime64[ns]
 6   account_end_date    75000 non-null  datetime64[ns]
 7   is_member           75000 non-null  bool          
dtypes: bool(1), datetime64[ns](3), int64(1), object(3)
memory usage: 4.1+ MB
None


### Payments CSV 

In [16]:
print(data_frame_payments.shape)
print(data_frame_payments.info())
print(data_frame_payments.sample())

(1946607, 4)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1946607 entries, 0 to 1946606
Data columns (total 4 columns):
 #   Column      Dtype  
---  ------      -----  
 0   payment_id  int64  
 1   date        object 
 2   amount      float64
 3   rider_id    int64  
dtypes: float64(1), int64(2), object(1)
memory usage: 59.4+ MB
None
         payment_id        date  amount  rider_id
1383515     1383516  2021-02-01    9.00     54403


In [17]:
print(f'Missing values for data set?: {data_frame_payments.isna().values.any()}')
print(f'Missing values for data set?: {data_frame_payments.isna().values.sum()}')
# Remove duplicates
print(f'duplicates values for data set?: {data_frame_payments.duplicated().values.any()}')
print(f'duplicates values for data set?: {data_frame_payments.duplicated().values.sum()}')

Missing values for data set?: False
Missing values for data set?: 0
duplicates values for data set?: False
duplicates values for data set?: 0


#### Change Columns type to the desired format

In [18]:
print(data_frame_payments.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1946607 entries, 0 to 1946606
Data columns (total 4 columns):
 #   Column      Dtype  
---  ------      -----  
 0   payment_id  int64  
 1   date        object 
 2   amount      float64
 3   rider_id    int64  
dtypes: float64(1), int64(2), object(1)
memory usage: 59.4+ MB
None


In [19]:
data_frame_payments['date']  = pd.to_datetime(data_frame_payments['date'])

In [20]:
print(data_frame_payments.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1946607 entries, 0 to 1946606
Data columns (total 4 columns):
 #   Column      Dtype         
---  ------      -----         
 0   payment_id  int64         
 1   date        datetime64[ns]
 2   amount      float64       
 3   rider_id    int64         
dtypes: datetime64[ns](1), float64(1), int64(2)
memory usage: 59.4 MB
None


### Stations CSV 

In [21]:
print(data_frame_stations.shape)
print(data_frame_stations.info())
print(data_frame_stations.sample())

(838, 4)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 838 entries, 0 to 837
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   station_id  838 non-null    object 
 1   name        838 non-null    object 
 2   latitude    838 non-null    float64
 3   longitude   838 non-null    float64
dtypes: float64(2), object(2)
memory usage: 26.3+ KB
None
      station_id                     name  latitude  longitude
91  TA1306000029  Lake Shore Dr & Ohio St     41.89     -87.61


In [22]:
print(f'Missing values for data set?: {data_frame_stations.isna().values.any()}')
print(f'Missing values for data set?: {data_frame_stations.isna().values.sum()}')
# Remove duplicates
print(f'duplicates values for data set?: {data_frame_stations.duplicated().values.any()}')
print(f'duplicates values for data set?: {data_frame_stations.duplicated().values.sum()}')

Missing values for data set?: False
Missing values for data set?: 0
duplicates values for data set?: False
duplicates values for data set?: 0


### Trips CSV 

In [23]:
print(data_frame_trips.shape)
print(data_frame_trips.info())
print(data_frame_trips.sample())

(4584921, 7)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4584921 entries, 0 to 4584920
Data columns (total 7 columns):
 #   Column            Dtype 
---  ------            ----- 
 0   trip_id           object
 1   rideable_type     object
 2   start_at          object
 3   ended_at          object
 4   start_station_id  object
 5   end_station_id    object
 6   rider_id          int64 
dtypes: int64(1), object(6)
memory usage: 244.9+ MB
None
                  trip_id rideable_type             start_at  \
2128012  7E4B6A16CAAF9623  classic_bike  2021-07-10 14:51:01   

                    ended_at start_station_id end_station_id  rider_id  
2128012  2021-07-10 15:00:12     TA1307000127   TA1307000052     36351  


In [24]:
print(f'Missing values for data set?: {data_frame_trips.isna().values.any()}')
print(f'Missing values for data set?: {data_frame_trips.isna().values.sum()}')
## Remove duplicates
print(f'duplicates values for data set?: {data_frame_trips.duplicated().values.any()}')
print(f'duplicates values for data set?: {data_frame_trips.duplicated().values.sum()}')

Missing values for data set?: False
Missing values for data set?: 0
duplicates values for data set?: False
duplicates values for data set?: 0


#### Change Columns type to the desired format

In [25]:
print(data_frame_trips.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4584921 entries, 0 to 4584920
Data columns (total 7 columns):
 #   Column            Dtype 
---  ------            ----- 
 0   trip_id           object
 1   rideable_type     object
 2   start_at          object
 3   ended_at          object
 4   start_station_id  object
 5   end_station_id    object
 6   rider_id          int64 
dtypes: int64(1), object(6)
memory usage: 244.9+ MB
None


In [26]:
data_frame_trips['start_at']  = pd.to_datetime(data_frame_trips['start_at'])
data_frame_trips['ended_at']  = pd.to_datetime(data_frame_trips['ended_at'])

In [27]:
print(data_frame_trips.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4584921 entries, 0 to 4584920
Data columns (total 7 columns):
 #   Column            Dtype         
---  ------            -----         
 0   trip_id           object        
 1   rideable_type     object        
 2   start_at          datetime64[ns]
 3   ended_at          datetime64[ns]
 4   start_station_id  object        
 5   end_station_id    object        
 6   rider_id          int64         
dtypes: datetime64[ns](2), int64(1), object(4)
memory usage: 244.9+ MB
None


# Populating Cleaned Data 

## Automation Functions

In [28]:
# Helper functions
import psycopg2.extras as extras
def execute_values(conn, df, table):
  
    tuples = [tuple(x) for x in df.to_numpy()]
  
    cols = ','.join(list(df.columns))
  
    # SQL query to execute
    query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_values() done")
    cursor.close()

'\ndef table_recreate(cursor, tableName: str, tableFields: str ):\n     " Takes the Connection , Table name and create Fields, will drop the table if exists and create it again with the desired fields Ex : table_recreate(cursor, payments,\n                    "(sessionId INT,\n                    itemInSession INT,\n                    artist TEXT,\n                    song TEXT,\n                    length FLOAT, \n                    PRIMARY KEY (sessionId, itemInSession))")  "\n    try:\n        \n        cursor.execute("DROP TABLE IF EXISTS {0};".format(tableName))\n        query = f"CREATE TABLE {tableName} "\n        query = query + tableFields\n        cursor.execute(query)\n        print("Finished creating table {0}".format(tableName))\n    except psycopg2.Error as e: \n    print(f"Error: Couldn\'t recreate the table: {tableName}, something went wrong")\n    print(e)\n'

## Create Tables & Insert data

### Riders Table 

#### Create

In [29]:
TABLE = "riders"
FIELDS = """(rider_id INTEGER PRIMARY KEY,
                    first VARCHAR(50),
                    last VARCHAR(50), 
                    address VARCHAR(100), 
                    birthday DATE, 
                    account_start_date DATE, 
                    account_end_date DATE, 
                    is_member BOOLEAN);"""
try:    
        cursor.execute("DROP TABLE IF EXISTS {0};".format(TABLE))
        query = f"CREATE TABLE {TABLE} "
        query = query + FIELDS
        cursor.execute(query)
        print("Finished creating table {0}".format(TABLE))
except psycopg2.Error as e: 
    print(f"Error: Couldn't recreate the table: {TABLE}, something went wrong")
    print(e)

Finished creating table riders


#### Populate

In [30]:
for index, row in data_frame_riders.iterrows():   
    query = "INSERT INTO riders (rider_id, first, last, address, birthday, account_start_date, account_end_date, is_member)"
    query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
    try:
        cursor.execute(query, (row["rider_id"], row["first"], row["last"], row["address"], row["birthday"], row["account_start_date"], row["account_end_date"], row["is_member"]))
    except Exception as e:
        print(e)
print("Finished Populating ")

Finished Populating 


#### Test Data 

In [31]:
try:    
        query = f"SELECT Count (*) FROM  riders"
        cursor.execute(query)
except psycopg2.Error as e: 
    print(e)
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(75000,)


### Payments Table 

#### Create

In [32]:
TABLE = "payment"
FIELDS = """(payment_id INTEGER PRIMARY KEY, 
                    date DATE, 
                    amount MONEY, 
                    rider_id INTEGER);"""
try:    
        cursor.execute("DROP TABLE IF EXISTS {0};".format(TABLE))
        query = f"CREATE TABLE {TABLE} "
        query = query + FIELDS
        cursor.execute(query)
        print("Finished creating table {0}".format(TABLE))
except psycopg2.Error as e: 
    print(f"Error: Couldn't recreate the table: {TABLE}, something went wrong")
    print(e)

Finished creating table payment


In [33]:
query = "ALTER TABLE payment SET UNLOGGED"
try:    
        cursor.execute(query)
except psycopg2.Error as e: 
    print(e)

#### Populate

In [34]:
try:
    execute_values(conn, data_frame_payments, 'payment')
except Exception as e:
    print(e)
print("Finished Populating ")

execute_values() done
Finished Populating 


In [None]:
# This is super slow for 2m rows
"""
query = "INSERT INTO payment (payment_id, date, amount, rider_id)"
query = query + "VALUES (%s, %s, %s, %s)"
for index, row in data_frame_payments.iterrows():    
    try:
        cursor.execute(query, (row["payment_id"], row["date"], row["amount"], row["rider_id"]))
    except Exception as e:
        print(e)
Print("Finished Populating ")
"""

#### Test Data 

In [44]:
try:    
        query = f"SELECT COUNT(*) FROM  payment"
        cursor.execute(query)
except psycopg2.Error as e: 
    print(e)
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(1946607,)


In [36]:
query = "ALTER TABLE payment SET LOGGED"
try:
    cursor.execute(query)
except psycopg2.Error as e:
    print(e)

### Stations Table 

#### Create

In [37]:
TABLE = "station"
FIELDS = """(station_id VARCHAR(50) PRIMARY KEY, 
                    name VARCHAR(75), 
                    latitude FLOAT, 
                    longitude FLOAT);"""
try:    
        cursor.execute("DROP TABLE IF EXISTS {0};".format(TABLE))
        query = f"CREATE TABLE {TABLE} "
        query = query + FIELDS
        cursor.execute(query)
        print("Finished creating table {0}".format(TABLE))
except psycopg2.Error as e: 
    print(f"Error: Couldn't recreate the table: {TABLE}, something went wrong")
    print(e)

Finished creating table station


#### Populate

In [38]:
for index, row in data_frame_stations.iterrows():   
    query = "INSERT INTO station (station_id, name, latitude, longitude)"
    query = query + "VALUES (%s, %s, %s, %s)"
    try:
        cursor.execute(query, (row["station_id"], row["name"], row["latitude"], row["longitude"]))
    except Exception as e:
        print(e)
print("Finished Populating ")

Finished Populating 


#### Test Data 

In [43]:
try:    
        query = f"SELECT COUNT(*) FROM  station"
        cursor.execute(query)
except psycopg2.Error as e: 
    print(e)
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(838,)


### Trips Table 

#### Create

In [40]:
TABLE = "trip"
FIELDS = """(trip_id VARCHAR(50) PRIMARY KEY,
                    rideable_type VARCHAR(75),
                    start_at TIMESTAMP,
                    ended_at TIMESTAMP,
                    start_station_id VARCHAR(50),
                    end_station_id VARCHAR(50),
                    rider_id INTEGER);"""
try:
        cursor.execute("DROP TABLE IF EXISTS {0};".format(TABLE))
        query = f"CREATE TABLE {TABLE} "
        query = query + FIELDS
        cursor.execute(query)
        print("Finished creating table {0}".format(TABLE))
except psycopg2.Error as e:
    print(f"Error: Couldn't recreate the table: {TABLE}, something went wrong")
    print(e)

Finished creating table trip


In [41]:
query = "ALTER TABLE trip SET UNLOGGED"
try:
    cursor.execute(query)
except psycopg2.Error as e:
    print(e)

#### Populate

In [42]:
try:
    execute_values(conn, data_frame_trips, 'trip')
except Exception as e:
    print(e)
print("Finished Populating ")

execute_values() done
Finished Populating 


In [None]:
# This is super slow for 4m rows
"""
for index, row in data_frame_trips.iterrows():   
    query = "INSERT INTO trip (trip_id, rideable_type, start_at, ended_at, start_station_id, end_station_id, rider_id)"
    query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
    try:
        cursor.execute(query, (row["trip_id"], row["rideable_type"], row["start_at"], row["ended_at"], row["start_station_id"], row["end_station_id"], row["rider_id"]))
    except Exception as e:
        print(e)
Print("Finished Populating ")
"""

In [45]:
query = "ALTER TABLE trip SET LOGGED"
try:
    cursor.execute(query)
except psycopg2.Error as e:
    print(e)

#### Test Data 

In [46]:
try:    
        query = f"SELECT COUNT(*) FROM  trip "
        cursor.execute(query)
except psycopg2.Error as e: 
    print(e)
row = cursor.fetchone()
while row:
   print(row)
   row = cursor.fetchone()

(4584921,)


In [49]:
import sqlalchemy as sq

engine = sq.create_engine(f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_ENDPOINT}:{DB_PORT}/{DB}")
the_frame = pd.read_sql_table('riders', engine)

In [50]:
insp = sq.inspect(engine)
insp.get_columns(table_name='riders')
insp.get_table_names()

['riders', 'payment', 'station', 'trip']

In [51]:
print(the_frame.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 75000 entries, 0 to 74999
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   rider_id            75000 non-null  int64         
 1   first               75000 non-null  object        
 2   last                75000 non-null  object        
 3   address             75000 non-null  object        
 4   birthday            75000 non-null  datetime64[ns]
 5   account_start_date  75000 non-null  datetime64[ns]
 6   account_end_date    75000 non-null  datetime64[ns]
 7   is_member           75000 non-null  bool          
dtypes: bool(1), datetime64[ns](3), int64(1), object(3)
memory usage: 4.1+ MB
None
