# Data Science Project 1
## Frances Dai
## Omar Zeineddine
### needed local files:
### - "new_york2.csv" < this is the weather data we use to create a sql database
### - "taxi_tripdata.csv" < NYC taxi trip data used for csv file

In [79]:
import pandas as pd
import mysql.connector
from mysql.connector import Error
import numpy as np
from sqlalchemy import create_engine
import requests
import requests.exceptions
import cryptography
from urllib.parse import quote_plus

## Source 1: SQL Database (created locally)

### Variables

In [80]:
# MySQL connection information -- change as needed
host = "localhost"
user = "root"
password = "ds2002fd"
port = "3306"

# new york weather data file name, data taken from https://www.visualcrossing.com/weather/weather-data-services, used to create local sql databas
csv_file = "new_york2.csv"

# local sql database info
db_name = "new_york_weather_data"
table_name = "new_york_weather_data"

### Functions

In [81]:
# making new york weather sql database functions

# function for server connection and error handle
def create_server_connection(host_name, user_name, user_password, port):
    connection = None
    try:
        connection = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password,
            port=port
        )
        print("MySQL Database connection successful")
    except Error as err:
        print(f"Error: '{err}'")
    return connection

# function to create a database and error handle
def create_database(connection, query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        print("Database created success")
    except Error as err:
        print(f"Error: '{err}'")

# function to connect to the database and error handle
def create_db_connection(host_name, user_name, user_password, db_name, port):
    connection = None
    try:
        connection = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password,
            database=db_name,
            port=port
        )
        print("MySQL Database connection successful")
    except Error as err:
        print(f"Error: '{err}'")
    return connection

# Function to execute queries and error handle
def execute_query(connection, query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        connection.commit()
        print("Query successful")
    except Error as err:
        print(f"Error: '{err}'")

# read CSV file into dataframe
def read_csv(csv_file):
    return pd.read_csv(csv_file)

# insert into table
def insert_into_table(connection, df, table_name):
    cursor = connection.cursor()

    df = df.astype(object).where(pd.notnull(df), None)

    # SQL INSERT statement
    columns = ", ".join([f"`{col}`" for col in df.columns])
    placeholders = ", ".join(["%s" for _ in df.columns])
    insert_statement = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

    # prep data for insert
    data_tuples = [tuple(x) for x in df.values]

    # execute and handle errors for sql insert
    try:
        cursor.executemany(insert_statement, data_tuples)
        connection.commit()
        print(f"Data inserted into {table_name} successfully.")
    except Error as err:
        print(f"Error: '{err}'")

### Create Local SQL Database

In [82]:
# read weather csv file into dataframe
df = read_csv(csv_file)

# connect ot server
connection = create_server_connection(host, user, password, port)


# connect to database, create if not exist
create_database_query = f"CREATE DATABASE IF NOT EXISTS {db_name};"
create_database(connection, create_database_query)
connection = create_db_connection(host, user, password, db_name, port)


# create table
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} (" + ", ".join([f"{col} TEXT" for col in df.columns]) + ");"
execute_query(connection, create_table_query)

# insert weather data
insert_into_table(connection, df, table_name)


MySQL Database connection successful
Database created success
MySQL Database connection successful
Query successful
Data inserted into new_york_weather_data successfully.


### Create and Populate Weather Dimension Table

In [83]:
# fetch data from table function
def fetch_data(connection, query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        result = cursor.fetchall()
        return result
    except Error as err:
        print(f"Error: '{err}'")
        return None

In [84]:
# connect to mysql server
connection = create_server_connection(host, user, password, port)

# create new scheme for project
create_schema_query = "CREATE SCHEMA IF NOT EXISTS ds_project;"
execute_query(connection, create_schema_query)

# connect to schema
ds_project_connection = create_db_connection(host, user, password, "ds_project", port)

# create weather dimension table SQL query with only needed columns from database
create_weather_dim_table = """
CREATE TABLE IF NOT EXISTS weather_dim (
    weather_data_key INT AUTO_INCREMENT PRIMARY KEY,
    name TEXT,
    datetime DATETIME,
    preciptype TEXT,
    conditions TEXT,
    description TEXT
);
"""
execute_query(ds_project_connection, create_weather_dim_table)

# select weather connection fields to be inserted
ny_weather_connection = create_db_connection(host, user, password, "new_york_weather_data", port)
select_query = """
SELECT name, datetime, preciptype, conditions, description 
FROM new_york_weather_data;
"""
weather_data = fetch_data(ny_weather_connection, select_query)

# insert data query
insert_query = """
INSERT INTO weather_dim (name, datetime, preciptype, conditions, description)
VALUES (%s, %s, %s, %s, %s);
"""

# insert data, handle errors, close other sql connection
cursor = ds_project_connection.cursor()
try:
    cursor.executemany(insert_query, weather_data)
    ds_project_connection.commit()
    print(f"Data insert into weather_dim success")
except Error as err:
    print(f"Error: '{err}'")
finally:
    cursor.close()
    if ny_weather_connection.is_connected():
        ny_weather_connection.close()
        print("new_york_weather_data connection is closed")
    if ds_project_connection.is_connected():
        ds_project_connection.close()
        print("new_york_weather_data connection is closed")

MySQL Database connection successful
Query successful
MySQL Database connection successful
Query successful
MySQL Database connection successful
Data insert into weather_dim success
new_york_weather_data connection is closed
new_york_weather_data connection is closed


## Source 2: API Call

### Variables

In [85]:
date_from = "2021-07-01"
date_to = "2021-07-08"
stock = "AAPL"
api_key = "oH2o7jU2pcpoEgKMCVbMnTaLdegBMFbTGNfX2jhD"
# querystring = {"api_token": api_key, "symbols":stock, "date_from": date_from, "date_to": date_to }
url = "https://api.stockdata.org/v1/data/eod"

### Functions

In [86]:
# functions for api calls

def get_api_response(url, querystring):
    try:
        response = requests.request("GET", url, params=querystring)
        response.raise_for_status()
    
    except requests.exceptions.HTTPError as errh:
        return "An Http Error occurred: " + repr(errh)
    except requests.exceptions.ConnectionError as errc:
        return "An Error Connecting to the API occurred: " + repr(errc)
    except requests.exceptions.Timeout as errt:
        return "A Timeout Error occurred: " + repr(errt)
    except requests.exceptions.InvalidHeader as erri:
        return "A Header Error occurred: " + repr(erri)
    except requests.exceptions.RequestException as err:
        return "An Unknown Error occurred: " + repr(err)
        
    return response.json()

# function to insert only needed (transformed) stock data and handle errors
def insert_stock_data(connection, ticker, date):
    cursor = connection.cursor()
    query = """
    INSERT INTO ds_project.stocks_dim (ticker, date)
    VALUES (%s, %s);
    """
    try:
        cursor.execute(query, (ticker, date))
        connection.commit()
    except Error as err:
        print(f"Error: '{err}'")

### create and populate table

In [87]:
# make stocks_dim table query with only needed columns from api call
create_stocks_dim_table = """
CREATE TABLE IF NOT EXISTS ds_project.stocks_dim (
    stock_data_key INT AUTO_INCREMENT PRIMARY KEY,
    ticker VARCHAR(10),
    date DATE
);
"""

# execute stock_dim table making query
execute_query(connection, create_stocks_dim_table)

stocks = ["AAPL", "TSLA", "MSFT", "GOOG"]

# populate stock data into table
for stock in stocks:
    querystring = {"api_token": api_key, "symbols":stock, "date_from": date_from, "date_to": date_to }
    response = get_api_response(url, querystring)
    if isinstance(response, dict) and 'data' in response:
        for daily_data in response['data']:
            insert_stock_data(connection, stock, daily_data['date'][:10]) # trimming time from date
    else:
        print(f"Error retrieving data for stock: {stock}")

print("inserting stock data success")

Query successful
inserting stock data success


## Source 3: CSV file

## Variables

In [88]:
csv_file_path = "taxi_tripdata.csv"

# dates to insert
dates_to_insert = [
    "2021-07-01",
    "2021-07-02",
    "2021-07-03",
    "2021-07-04",
    "2021-07-05",
    "2021-07-06",
    "2021-07-07",
    "2021-07-08"
]

# function for query execute
def execute_query(connection, query, values=None):
    cursor = connection.cursor()
    try:
        if values:
            cursor.execute(query, values)
        else:
            cursor.execute(query)
        connection.commit()
        print("Query successful")
    except Error as err:
        print(f"Error: '{err}'")

## Create and execute queries

In [89]:
# make nyc taxi_dim table from csv file
csv_file_path = "taxi_tripdata.csv"
df = pd.read_csv(csv_file_path)

# transform data by dropping unneccessary columns
df.drop(["VendorID", "RatecodeID", "store_and_fwd_flag", "PULocationID", "DOLocationID", "passenger_count", "extra", "lpep_pickup_datetime", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge"], axis=1, inplace=True)


# sql query to make nyc taxi table
create_taxi_dim_table = """
CREATE TABLE IF NOT EXISTS ds_project.taxi_dim (
    taxi_data_key INT AUTO_INCREMENT PRIMARY KEY,
    dropoff_datetime DATETIME,
    trip_distance DECIMAL(5, 2)
);
"""
# create table
execute_query(connection, create_taxi_dim_table)

# insert sql query
insert_taxi_data_query = """
INSERT INTO ds_project.taxi_dim (dropoff_datetime, trip_distance)
VALUES (%s, %s);
"""


# populate taxi_dim
for date_string in dates_to_insert:
    filtered_df = df[pd.to_datetime(df['lpep_dropoff_datetime']).dt.date == pd.to_datetime(date_string).date()]
    if not filtered_df.empty:
        first_entry = filtered_df.iloc[0]
        dropoff_datetime = pd.to_datetime(first_entry['lpep_dropoff_datetime'])
        trip_distance = first_entry['trip_distance']
        
        # insert data
        execute_query(connection, insert_taxi_data_query, (dropoff_datetime, trip_distance))
    else:
        print(f"No data for date {date_string}")

print("taxi dimensional table insert success")

  df = pd.read_csv(csv_file_path)


Query successful
Query successful
Query successful
Query successful
Query successful
Query successful
Query successful
Query successful
Query successful
taxi dimensional table insert success


### Turning Dimension Tables into Data Frames

### Variables

In [90]:
# sql queries
sql_taxi = "SELECT * FROM ds_project.taxi_dim;"
sql_weather = "SELECT * FROM ds_project.weather_dim;"
sql_stock = "SELECT * FROM ds_project.stocks_dim;"

### Function

In [91]:
def get_dataframe(user_id, pwd, host_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/ds_project"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

In [92]:
# turning the sql databases into dataframes

df_taxi = get_dataframe(user, password, host, sql_taxi)
df_weather = get_dataframe(user, password, host, sql_weather)
df_stocks = get_dataframe(user, password, host, sql_stock)

# renaming date column in weather dim table
df_weather.rename(columns={'datetime': 'date'}, inplace=True)

# rename 'lpep_dropoff_datetime' to date in taxi dim table
df_taxi.rename(columns={'dropoff_datetime': 'date'}, inplace=True)

# print tables
print(df_stocks.head(6))
print(df_weather.head(6))
print(df_taxi.head(6))

   stock_data_key ticker        date
0               1   AAPL  2021-07-08
1               2   AAPL  2021-07-07
2               3   AAPL  2021-07-06
3               4   AAPL  2021-07-02
4               5   AAPL  2021-07-01
5               6   TSLA  2021-07-08
   weather_data_key      name       date preciptype              conditions  \
0                 1  new york 2021-07-01       rain  Rain, Partially cloudy   
1                 2  new york 2021-07-02       rain  Rain, Partially cloudy   
2                 3  new york 2021-07-03       rain          Rain, Overcast   
3                 4  new york 2021-07-04       rain  Rain, Partially cloudy   
4                 5  new york 2021-07-05       None                   Clear   
5                 6  new york 2021-07-06       rain  Rain, Partially cloudy   

                                         description  
0        Partly cloudy throughout the day with rain.  
1        Partly cloudy throughout the day with rain.  
2  Cloudy skies throug

## Making the Fact Table

### Variables

In [93]:
df_fact_table = pd.DataFrame()

# SQL query to make fact table
create_fact_table_query = """
CREATE TABLE IF NOT EXISTS ds_project.fact_table (
    fact_data_key INT AUTO_INCREMENT PRIMARY KEY,
    date DATE,
    stock_data_key INT,
    weather_data_key INT,
    taxi_data_key INT,
    trip_distance DECIMAL(10,2),
    FOREIGN KEY (stock_data_key) REFERENCES ds_project.stocks_dim(stock_data_key),
    FOREIGN KEY (weather_data_key) REFERENCES ds_project.weather_dim(weather_data_key),
    FOREIGN KEY (taxi_data_key) REFERENCES ds_project.taxi_dim(taxi_data_key)
);
"""
execute_query(connection, create_fact_table_query)


Query successful


In [94]:
# transform date column in each dim table to all be date type
df_stocks['date'] = pd.to_datetime(df_stocks['date']).dt.date
df_weather['date'] = pd.to_datetime(df_weather['date']).dt.date
df_taxi['date'] = pd.to_datetime(df_taxi['date']).dt.date

# merge data frames together on date
merged_df = pd.merge(df_stocks, df_weather, on='date', how='outer', suffixes=('_stock', '_weather'))
merged_df = pd.merge(merged_df, df_taxi, on='date', how='outer')

# rename keys to fit fact table structure
merged_df.rename(columns={'stock_data_key': 'stock_data_key',
                          'weather_data_key': 'weather_data_key',
                          'taxi_data_key': 'taxi_data_key'
}, inplace=True)

# Connect to the database
connection = create_engine(connection_string).connect()

# only keep what is needed in fact table
fact_table_df = merged_df[['date', 'stock_data_key', 'weather_data_key', 'taxi_data_key', 'trip_distance']]

# write  fact table to sql
fact_table_df.to_sql('fact_table', con=engine, schema='ds_project', if_exists='append', index=False)

# close connection
connection.close()

# SQL Queries

In [95]:
def get_dataframe(user_id, pwd, host_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/ds_project"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

db_name = "ds_project"

In [96]:
sql_query1 = """SELECT weather.conditions, SUM(taxi.trip_distance) AS total_distance
FROM ds_project.fact_table AS fact
INNER JOIN ds_project.taxi_dim AS taxi ON fact.taxi_data_key = taxi.taxi_data_key
INNER JOIN ds_project.weather_dim AS weather ON fact.weather_data_key = weather.weather_data_key
GROUP BY weather.conditions;""".format(db_name)

q1 = get_dataframe(user,password,host,sql_query1)
q1

Unnamed: 0,conditions,total_distance
0,"Rain, Partially cloudy",1924.8
1,Rain,103.36
2,"Rain, Overcast",1.2
3,Clear,21.52


In [97]:
sql_q2 = """SELECT weather.conditions, weather.description, stock.ticker
FROM ds_project.fact_table AS fact
INNER JOIN ds_project.weather_dim AS weather ON fact.weather_data_key = weather.weather_data_key
INNER JOIN ds_project.stocks_dim AS stock ON fact.stock_data_key = stock.stock_data_key;
""".format(db_name)

q2 = get_dataframe(user,password,host,sql_q2)
q2

Unnamed: 0,conditions,description,ticker
0,"Rain, Partially cloudy",Becoming cloudy in the afternoon with rain.,AAPL
1,"Rain, Partially cloudy",Becoming cloudy in the afternoon with rain.,AAPL
2,"Rain, Partially cloudy",Becoming cloudy in the afternoon with rain.,AAPL
3,"Rain, Partially cloudy",Becoming cloudy in the afternoon with rain.,AAPL
4,"Rain, Partially cloudy",Becoming cloudy in the afternoon with rain.,AAPL
...,...,...,...
155,"Rain, Partially cloudy",Partly cloudy throughout the day with rain.,GOOG
156,"Rain, Partially cloudy",Partly cloudy throughout the day with rain.,GOOG
157,"Rain, Partially cloudy",Partly cloudy throughout the day with rain.,GOOG
158,"Rain, Partially cloudy",Partly cloudy throughout the day with rain.,GOOG


In [99]:
sql_q3 = """SELECT taxi.dropoff_datetime
FROM fact_table
INNER JOIN ds_project.taxi_dim AS taxi ON fact_table.taxi_data_key = taxi.taxi_data_key
GROUP BY taxi.taxi_data_key;""".format(db_name)


q3 = get_dataframe(user,password,host,sql_q3)
q3

Unnamed: 0,dropoff_datetime
0,2021-07-01 00:35:36
1,2021-07-02 06:17:23
2,2021-07-03 09:09:48
3,2021-07-04 00:00:00
4,2021-07-05 06:49:52
5,2021-07-06 07:31:03
6,2021-07-07 07:13:46
7,2021-07-08 00:00:00
