In [1]:
import os
from dagster import asset, AssetIn, Output, WeeklyPartitionsDefinition
import pandas as pd
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, lit, concat

In [7]:
from datetime import datetime, timedelta

In [8]:
def generate_weekly_dates(start_date_str, end_date_str):
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    current_date = start_date
    while current_date < end_date:
        yield current_date.strftime("%Y-%m-%d")
        current_date += timedelta(weeks=1)
start_date_str = "2023-01-01"
end_date_str = "2023-07-01"

weekly_dates = list(generate_weekly_dates(start_date_str, end_date_str))

In [9]:
weekly_dates

['2023-01-01',
 '2023-01-08',
 '2023-01-15',
 '2023-01-22',
 '2023-01-29',
 '2023-02-05',
 '2023-02-12',
 '2023-02-19',
 '2023-02-26',
 '2023-03-05',
 '2023-03-12',
 '2023-03-19',
 '2023-03-26',
 '2023-04-02',
 '2023-04-09',
 '2023-04-16',
 '2023-04-23',
 '2023-04-30',
 '2023-05-07',
 '2023-05-14',
 '2023-05-21',
 '2023-05-28',
 '2023-06-04',
 '2023-06-11',
 '2023-06-18',
 '2023-06-25']

In [11]:
pd.read_parquet("2023-01-01.parquet")

Unnamed: 0,fare_amount,mta_tax,improvement_surcharge,payment_type,RatecodeID,extra,tip_amount,tolls_amount,total_amount,congestion_surcharge,airport_fee,PaymentID
0,21.90,0.5,1.0,1,1.0,1.00,2.00,0.00,28.90,2.5,0.00,F202301010
1,10.70,0.5,1.0,1,1.0,3.50,2.00,0.00,17.70,2.5,0.00,F202301011
2,70.00,0.5,1.0,1,2.0,3.75,0.00,6.55,81.80,2.5,1.25,F202301012
3,17.00,0.5,1.0,2,1.0,1.00,0.00,0.00,22.00,2.5,0.00,F202301013
4,18.40,0.5,1.0,1,1.0,1.00,5.00,0.00,28.40,2.5,0.00,F202301014
...,...,...,...,...,...,...,...,...,...,...,...,...
56223,15.81,0.5,1.0,0,,0.00,2.93,0.00,22.74,,,F202301018589969083
56224,18.77,0.5,1.0,0,,0.00,2.28,0.00,25.05,,,F202301018589969084
56225,14.20,0.5,1.0,0,,0.00,4.51,0.00,22.71,,,F202301018589969085
56226,53.95,0.5,1.0,0,,0.00,11.59,0.00,69.54,,,F202301018589969086


In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

23/12/14 17:18:41 WARN Utils: Your hostname, TrungUbun resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface enp3s0)
23/12/14 17:18:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 17:18:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/14 17:18:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark_df = spark.read.parquet("bronze_yellow_record_2023-01-01.parquet", inferSchema = True, header = True)
spark_df.show()

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|Column1|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3

In [5]:
spark_df.cache()
select_cols = ["tpep_dropoff_datetime", "DOLocationID"]
spark_df = spark_df.select(select_cols)
spark_df = spark_df.dropDuplicates(select_cols)
specialID = concat(lit("F2023"), monotonically_increasing_id())
spark_df = spark_df.withColumn("PickUpID", specialID)
spark_df = spark_df.withColumnRenamed('tpep_dropoff_datetime','Dropoff_datetime')
spark_df.show()
spark_df.unpersist()

23/12/14 17:19:14 WARN CacheManager: Asked to cache already cached data.

+-------------------+------------+--------+
|   Dropoff_datetime|DOLocationID|PickUpID|
+-------------------+------------+--------+
|2023-01-01 01:24:47|         261|  F20230|
|2023-01-01 00:53:05|         237|  F20231|
|2023-01-01 00:48:30|          48|  F20232|
|2023-01-01 00:56:24|         255|  F20233|
|2023-01-01 00:49:41|         263|  F20234|
|2023-01-01 00:59:57|         263|  F20235|
|2023-01-01 01:00:58|         161|  F20236|
|2023-01-01 01:05:15|         263|  F20237|
|2023-01-01 00:58:45|          45|  F20238|
|2023-01-01 01:15:51|          48|  F20239|
|2023-01-01 01:09:27|         232| F202310|
|2023-01-01 01:00:50|         158| F202311|
|2023-01-01 00:12:54|         137| F202312|
|2023-01-01 01:17:49|          90| F202313|
|2023-01-01 00:35:00|         246| F202314|
|2023-01-01 00:16:00|          43| F202315|
|2023-01-01 01:13:11|         236| F202316|
|2023-01-01 00:37:32|         231| F202317|
|2023-01-01 00:16:46|         236| F202318|
|2023-01-01 00:54:31|         16

                                                                                

DataFrame[Dropoff_datetime: timestamp_ntz, DOLocationID: bigint, PickUpID: string]

In [10]:
import polars as pl
from sqlalchemy import create_engine

In [None]:
def connect_mysql(config):
    conn_info = (
            f"mysql+pymysql://{config['user']}:{config['password']}"
            + f"@{config['host']}:{config['port']}"
            + f"/{config['database']}"
    )
    db_conn = create_engine(conn_info)
    try:
        yield db_conn
    except Exception:
        print("Error occurred while connecting to MySQL")

In [22]:
import polars as pl
from sqlalchemy import create_engine

def connect_mysql(config) -> str:
    conn_info = (
        f"mysql://{config['user']}:{config['password']}"
        + f"@{config['host']}:{config['port']}"
        + f"/{config['database']}"
    )
    return conn_info

def create_mysql_engine(config):
    conn_info = connect_mysql(config)
    return create_engine(conn_info)

MYSQL_CONFIG = {
    "host": "de_mysql",
    "port": 3306,
    "database": "trip_record",
    "user": "admin",
    "password": "admin123",
}

def extract_data(sql: str, engine) -> pl.DataFrame:
    pl_data = pl.read_sql_query(sql, engine)
    return pl_data

# Create a MySQL engine
mysql_engine = create_mysql_engine(MYSQL_CONFIG)

query = "SELECT * FROM green_record;"
df_data = extract_data(query, mysql_engine)

print(df_data.shape)


ModuleNotFoundError: No module named 'MySQLdb'

In [2]:
import polars as pl
from sqlalchemy import create_engine

def connect_mysql(config) -> str:
    conn_info = (
        f"mysql:pymsql//{config['user']}:{config['password']}"
        + f"@{config['host']}:{config['port']}"
        + f"/{config['database']}"
    )
    return conn_info

MYSQL_CONFIG = {
    "host": "de_mysql",
    "port": 3306,
    "database": "trip_record",
    "user": "admin",
    "password": "admin123",
}

# Create a MySQL engine
conn_str = connect_mysql(MYSQL_CONFIG)
mysql_engine = create_engine(conn_str)

# Define your SQL query
query = "SELECT * FROM green_record;"

# Use polars to read data from MySQL
pl_data = pl.read_sql_query(query, mysql_engine)

# Close the engine after use
mysql_engine.dispose()

# Display the shape of the DataFrame
print(pl_data.shape)


ArgumentError: Could not parse SQLAlchemy URL from string 'mysql:pymsql//admin:admin123@de_mysql:3306/trip_record'

In [3]:
from datetime import datetime, timedelta
def generate_3days_dates(start_date_str, end_date_str):
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    current_date = start_date
    while current_date < end_date:
        yield current_date.strftime("%Y-%m-%d")
        current_date += timedelta(days=3)
start_date_str = "2023-01-01"
end_date_str = "2023-07-01"
three_days = list(generate_3days_dates(start_date_str, end_date_str))
three_days

['2023-01-01',
 '2023-01-04',
 '2023-01-07',
 '2023-01-10',
 '2023-01-13',
 '2023-01-16',
 '2023-01-19',
 '2023-01-22',
 '2023-01-25',
 '2023-01-28',
 '2023-01-31',
 '2023-02-03',
 '2023-02-06',
 '2023-02-09',
 '2023-02-12',
 '2023-02-15',
 '2023-02-18',
 '2023-02-21',
 '2023-02-24',
 '2023-02-27',
 '2023-03-02',
 '2023-03-05',
 '2023-03-08',
 '2023-03-11',
 '2023-03-14',
 '2023-03-17',
 '2023-03-20',
 '2023-03-23',
 '2023-03-26',
 '2023-03-29',
 '2023-04-01',
 '2023-04-04',
 '2023-04-07',
 '2023-04-10',
 '2023-04-13',
 '2023-04-16',
 '2023-04-19',
 '2023-04-22',
 '2023-04-25',
 '2023-04-28',
 '2023-05-01',
 '2023-05-04',
 '2023-05-07',
 '2023-05-10',
 '2023-05-13',
 '2023-05-16',
 '2023-05-19',
 '2023-05-22',
 '2023-05-25',
 '2023-05-28',
 '2023-05-31',
 '2023-06-03',
 '2023-06-06',
 '2023-06-09',
 '2023-06-12',
 '2023-06-15',
 '2023-06-18',
 '2023-06-21',
 '2023-06-24',
 '2023-06-27',
 '2023-06-30']