# Data Ingestion Approaches

## Approach 1: Using `pyarrow` to read Parquet and `pandas` to write to SQL in chunks

**Advantages:**
- Directly reads Parquet files, which is efficient for handling large datasets.
- Allows for chunked processing, which can help manage memory usage and improve performance.

**Disadvantages:**
- Requires converting the Parquet data to a Pandas DataFrame before writing to SQL, which can be memory-intensive for very large datasets.

## Approach 2: Using `pandas.read_csv` with `iterator=True` and `chunksize`

**Advantages:**
- Efficient for reading large CSV files in chunks, which helps manage memory usage.
- Allows for chunked processing, which can improve performance and avoid memory issues.

**Disadvantages:**
- Only applicable to CSV files, not Parquet files. If your data is in Parquet format, you would need to convert it to CSV first.

## Example of Approach 2

Here is an example of how to use `pandas.read_csv` with `iterator=True` and `chunksize` to read a CSV file and write it to SQL in chunks:

```python
import pandas as pd
from sqlalchemy import create_engine

# Create a SQLAlchemy engine
engine = create_engine("postgresql://postgres:root@localhost:5433/ny_taxi")

# Read the CSV file in chunks
csv_file = "yellow_tripdata_2024-10.csv"
chunksize = 100000
csv_iterator = pd.read_csv(csv_file, iterator=True, chunksize=chunksize)

# Iterate over the chunks and write to SQL
for chunk in csv_iterator:
    chunk.to_sql("yellow_taxi_data", con=engine, if_exists="append")
    print(f"Written chunk with {len(chunk)} rows")
```

## Approach 3: Using Dask to read Parquet and write to SQL in chunks

**Advantages:**
- Scales to larger-than-memory datasets by breaking them into smaller partitions and processing them in parallel.
- Integrates well with the existing Python ecosystem, including Pandas and SQLAlchemy.
- Can improve performance by parallelizing operations and distributing the workload across multiple cores or machines.
- Can distribute the workload across different nodes in the cloud, enabling efficient processing of large datasets in a distributed computing environment.

**Disadvantages:**
- Requires additional dependencies and setup.

## Example of Approach 3

Here is an example of how to use Dask to read a Parquet file and write it to SQL in chunks:

```python
import dask.dataframe as dd
from sqlalchemy import create_engine

# Create a SQLAlchemy engine
engine = create_engine("postgresql://postgres:root@localhost:5433/ny_taxi")

# Read the Parquet file using Dask
dask_df = dd.read_parquet("yellow_tripdata_2024-10.parquet")

# Convert Dask DataFrame to Pandas DataFrame in chunks and write to SQL
for chunk in dask_df.to_delayed():
    chunk_df = chunk.compute()
    chunk_df.to_sql("yellow_taxi_data", con=engine, if_exists="append")
    print(f"Written chunk with {len(chunk_df)} rows")
```

## Checking Memory Size

You can find out the memory size of the machine that processes the dataset using various methods depending on the operating system.

### On Linux and macOS

You can use the `free` command to check the memory size:

```bash
free -h
```

This command will display the total, used, and available memory in a human-readable format.

### On Windows

You can use the `systeminfo` command to check the memory size:

```powershell
systeminfo | findstr /C:"Total Physical Memory"
```

This command will display the total physical memory of the machine.

### Using Python

You can also use Python to check the memory size using the `psutil` library:

1. **Install `psutil`:**

```bash
pip install psutil
```

2. **Check the memory size using Python:**

```python
import psutil

# Get the total memory in bytes
total_memory = psutil.virtual_memory().total

# Convert to gigabytes
total_memory_gb = total_memory / (1024 ** 3)

print(f"Total memory: {total_memory_gb:.2f} GB")
```

## Conclusion

- If your data is in Parquet format, the first approach using `pyarrow` and `pandas` is more suitable.
- If your data is in CSV format, the second approach using `pandas.read_csv` with `iterator=True` and `chunksize` is more efficient.
- If you need to handle larger-than-memory datasets or require parallel processing, the third approach using Dask is recommended.

Choose the approach that best fits your data format and processing requirements. If you need to handle both CSV and Parquet formats, you can implement multiple approaches and use the appropriate one based on the data format.

In [1]:
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine

In [2]:
# https://www.nyc.gov/assets/tlc/downloads/pdf/working_parquet_format.pdf
trips = pq.read_table("yellow_tripdata_2024-10.parquet")

In [3]:
trips_df = trips.to_pandas()

In [4]:
# follow along with the Zoomcamp lecture
# https://www.youtube.com/watch?v=2JM-ziJt0WI&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=7
trips_df.dtypes

VendorID                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
Airport_fee                     float64
dtype: object

In [13]:
engine = create_engine("postgresql://postgres:root@localhost:5433/ny_taxi")

In [14]:
# write the trips dataframe into a db table
print(pd.io.sql.get_schema(trips_df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" INTEGER, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	"Airport_fee" FLOAT(53)
)




In [12]:
# create a db table that includes column names only
trips_df.head(n=0).to_sql("yellow_taxi_data", con=engine, if_exists="replace")

0

In [15]:
# number of 10,000-row chunks to iterate over while writing the values to the database
df_dim, iterations, last_batch = trips_df.shape, trips_df.shape[0]//10000, trips_df.shape[0]%10000
df_dim, iterations, last_batch

((3833771, 19), 383, 3771)

In [None]:
trips_df.head(10), trips_df.head()

In [None]:
chunksize = 10000  # Define the chunk size
for chunk in range(0, len(trips_df), chunksize):
    %time trips_df.iloc[chunk:chunk + chunksize].to_sql("yellow_taxi_data", con=engine, if_exists="append")
    print(f"Written rows {chunk} to {chunk + chunksize}")