# DATA WRANGLING HACKATHON

# DATABASE STEP

### Overview
This data dictionary describes High Volume FHV trip data. Each row represents a single trip in an FHV dispatched by one of NYC’s licensed High Volume FHV bases. On August 14, 2018, Mayor de Blasio signed Local Law 149 of 2018, creating a new license category for TLC-licensed FHV businesses that currently dispatch or plan to dispatch more than 10,000 FHV trips in New York City per day under a single brand, trade, or operating name, referred to as High-Volume For-Hire Services (HVFHS). This law went into effect on Feb 1, 2019.

### Objective
The main goal of this hackathon is to determine if the client is going to give a tip. 
Your submission file should be a CSV file with two columns (see example in sample_	submission.csv):
ID:  Id of the observation
Tipped: If the client Tipped or not

A dataset spread over several data sources has been provided for you. The total number of features is plentiful and it’s up to you to use as many or as little as you want. Given that, some features might be more relevant than others. 
Keep in mind that this is a Data Wrangling specialization. 

### Datasets:
| **Dataset** | **Information**   | Location|
|-------------|-------------------|---------------------|
|API          | Trip Mileage      | https://hckt02-api.lisbondatascience.org/docs#/default/get_data_data_get |
|Webpage      | Taxi Zone Data    | https://s02-infrastructure.s3.eu-west-1.amazonaws.com/hackathon-02-batch8/index.html |
|Files        | Detailed Trip Data| https://drive.google.com/drive/folders/12MhOAVrplggHVTm6-CtjqkkjI9xrVPek?usp=drive_link|
|Database     | Weather Data      | batch-s02.ctq2kxc7kx1i.eu-west-1.rds.amazonaws.com



# psycopg2 Library

## Psycopg2 Library Overview

**Psycopg2** is a popular Python library used for interacting with PostgreSQL databases. It is known for its performance, reliability, and compliance with the Python Database API (DB-API 2.0). Psycopg2 provides a robust interface to execute SQL queries, manage transactions, and retrieve results, making it an essential tool for Python developers working with PostgreSQL.

**Key Features:**

	1.	Asynchronous Support: Enables non-blocking database operations for improved performance in high-concurrency applications.
	2.	Connection Pooling: Facilitates efficient database connection management, reducing the overhead of repeatedly opening and closing connections.
	3.	Prepared Statements: Supports parameterized queries to prevent SQL injection and improve execution performance.
	4.	Advanced PostgreSQL Features: Full support for PostgreSQL-specific data types (e.g., JSON, UUID, arrays) and server-side features like cursors and stored procedures.
	5.	Transaction Management: Provides explicit control over transactions, allowing operations like commit and rollback.

**Advantages:**

	•	Efficiency: Written in C, Psycopg2 is optimized for performance, making it suitable for heavy database operations.
	•	Flexibility: Compatible with various versions of PostgreSQL and Python, including Python 3.
	•	Ease of Use: Its API design simplifies the execution of common database tasks, such as CRUD operations and schema management.

* **Psycopg2 is the go-to library for developers seeking a performant and reliable interface to PostgreSQL, especially in data-intensive Python applications.**

## Library

In [1]:
import psycopg2
from psycopg2.extras import RealDictCursor

def execute_query(db_config, query, params=None):
    try:
        connection = psycopg2.connect(
            host=db_config.get("host", "localhost"),
            database=db_config["database"],
            user=db_config["user"],
            password=db_config["password"],
            port=db_config.get("port", 5432),
        )
        with connection.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, params)
            if query.strip().lower().startswith("select"):
                return cursor.fetchall()
            connection.commit()
    except psycopg2.Error as e:
        print(f"Database error: {e}")
    finally:
        if connection:
            connection.close()



### In order to run the query in the database do the following:
1. Create a file with the following JSON, change the JSON variables below by the database credentials. 

{

    "host": "localhost",
    
    "database": "my_database",
    
    "user": "my_user",
    
    "password": "my_secret_password",
    
    "port": 5432

}

2. Add the file to the .gitignore to avoid it to go online on Github.

3. Upload the file to use the conection information within the next Jupyter cell.

### Loading database credentials

In [2]:
import json

with open(".data/database/db_config.json") as config_file:
    db_config_file = json.load(config_file)

In [3]:
# Run if library not installed
# !pip install dask-expr

In [4]:
# Connecting to the database and processing the table as a dataframe

import pandas as pd
import dask.dataframe as dd
from decimal import Decimal

from decimal import Decimal
if __name__ == "__main__":
    db_config = db_config_file
    query = "SELECT * FROM hackathon.weather_data where prcp is not Null"
    params = ("value",)
    results = execute_query(db_config, query, params)

for row in results:
    timestamp = float(row['timestamp'])
    temp = float(row['temp'])
    rhum = float(row['rhum'])
    prcp = float(row['prcp'])
    wspd = float(row['wspd'])
    pres = float(row['pres'])
    wdir = float(row['wdir'])
    weco = row['weco']
    
    # print(f"Timestamp: {timestamp}, Temp: {temp}°C, Humidity: {rhum}%, Precipitation: {prcp}mm")
    # print(f"Wind Speed: {wspd} km/h, Pressure: {pres} hPa, Wind Direction: {wdir}°, Weather: {weco}")
    # print("-" * 50)

    converted_result = [
    {key: float(value) if isinstance(value, Decimal) else (value if value is not None else 0)     
     for key, value in row.items()} 
        for row in results
    ]
     
# Dask DataFrame
dask_df = dd.from_pandas(pd.DataFrame(converted_result), npartitions=6)  # Divide into 4 partitions, adjust according to the data size.
dask_df.head()

Unnamed: 0,timestamp,temp,rhum,prcp,wspd,pres,wdir,weco
0,1610125000.0,2.8,52.0,0.0,11.2,1015.8,344.0,Cloudy
1,1610129000.0,3.3,50.0,0.0,11.2,1014.7,270.0,Cloudy
2,1610132000.0,3.9,57.0,0.0,0.0,1013.7,0.0,Cloudy
3,1610136000.0,3.9,57.0,0.0,7.6,1013.6,311.0,Cloudy
4,1610140000.0,3.9,55.0,0.0,5.4,1013.0,2.0,Cloudy


## BRONZE Layer

In [5]:
dask_df.to_parquet(".data/database/bronze/", write_index=False, engine="pyarrow")

In [6]:
dask_df2 = dd.read_parquet(".data/database/bronze/")

In [17]:
import pandas as pd

# Convert timestamp to datetime in each partition
dask_df2['timestamp'] = dask_df2['timestamp'].map_partitions(
    lambda df: pd.to_datetime(df, unit='ns')
)

In [18]:
dask_df2.head(100)

Unnamed: 0,timestamp,temp,prcp
0,2021-01-08 17:00:00,2.8,0.0
1,2021-01-08 18:00:00,3.3,0.0
2,2021-01-08 19:00:00,3.9,0.0
3,2021-01-08 20:00:00,3.9,0.0
4,2021-01-08 21:00:00,3.9,0.0
...,...,...,...
95,2021-01-12 16:00:00,4.4,0.0
96,2021-01-12 17:00:00,6.7,0.0
97,2021-01-12 18:00:00,7.8,0.0
98,2021-01-12 19:00:00,8.3,0.0


In [19]:
row_count = dask_df2.shape[0].compute()

In [20]:
print(row_count)

8788


In [21]:
dask_df2.head(100)

Unnamed: 0,timestamp,temp,prcp
0,2021-01-08 17:00:00,2.8,0.0
1,2021-01-08 18:00:00,3.3,0.0
2,2021-01-08 19:00:00,3.9,0.0
3,2021-01-08 20:00:00,3.9,0.0
4,2021-01-08 21:00:00,3.9,0.0
...,...,...,...
95,2021-01-12 16:00:00,4.4,0.0
96,2021-01-12 17:00:00,6.7,0.0
97,2021-01-12 18:00:00,7.8,0.0
98,2021-01-12 19:00:00,8.3,0.0


## Kepping only the columns we think that might affect the overall results of the ML model

In [22]:
cols = ['timestamp','temp',	'prcp']
dask_df2 = dask_df2[cols]

In [23]:
dask_df2.head()

Unnamed: 0,timestamp,temp,prcp
0,2021-01-08 17:00:00,2.8,0.0
1,2021-01-08 18:00:00,3.3,0.0
2,2021-01-08 19:00:00,3.9,0.0
3,2021-01-08 20:00:00,3.9,0.0
4,2021-01-08 21:00:00,3.9,0.0


In [25]:
print(dask_df2.dtypes)

timestamp    datetime64[ns]
temp                float64
prcp                float64
dtype: object


## SILVER Layer

In [29]:
dask_df2.to_parquet(".data/database/silver/", write_index=False, engine="pyarrow")

In [30]:
dask_df3 = dd.read_parquet(".data/database/silver/")
print(dask_df3.head())

            timestamp  temp  prcp
0 2021-01-08 17:00:00   2.8   0.0
1 2021-01-08 18:00:00   3.3   0.0
2 2021-01-08 19:00:00   3.9   0.0
3 2021-01-08 20:00:00   3.9   0.0
4 2021-01-08 21:00:00   3.9   0.0


## # FINISHED DATABASE COMPUTATION

**Conclusion:** Now we'll work on other data sources to bring everything together and create a single file for the ML model training.