In [98]:
from datetime import datetime
import requests
import clickhouse_connect
import logging
import sys
import boto3
from botocore.exceptions import ClientError, EndpointConnectionError
import pandas as pd
from io import BytesIO, StringIO
import gzip
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, when, upper, trim
from pyspark.sql.types import DecimalType
import socket
import geopandas as gpd

In [97]:
logging.basicConfig(level=logging.INFO, stream=sys.stdout)

bronze_storage = {'path': 'http://localhost:9000/', 'user': 'minioadmin', 'pass': 'minioadmin', 'bucket': 'nyc-taxi-raw-data'}
clickhouse_storage = {'path': 'http://localhost:9000/', 'user': 'minioadmin', 'pass': 'minioadmin', 'bucket': 'nyc-taxi-db'}

clickhouse_host = 'def-clickhouse'
clickhouse_port = '8123'
clickhouse_user = 'default'
clickhouse_password = 'admin'
clickhouse_staging_db_name = 'nyc_taxi_staging'
clickhouse_silver_db_name = 'nyc_taxi_silver'
clickhouse_gold_db_name = 'nyc_taxi_datamarts'

raw_data_base_link = 'https://d37ci6vzurychx.cloudfront.net/trip-data/'

lookup_table_file_name = 'taxi_zone_lookup.csv'
lookup_table_file_url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'
shape_file_name = 'taxi_zones.zip'
shape_file_url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zones.zip'

In [5]:
def check_db_existance(ch_client, db_name):
    """
    Checks if database with db_name exists
    """
    sql = f"""
        SELECT COUNT(*)
            FROM system.databases
        WHERE name='{db_name}'
    """

    try:
        result = bool(ch_client.query(sql).result_rows[0][0])
        logging.info(f'Database {db_name} exists: {result}')
        return result
    except Exception as e:
        logging.error(f'Error checking database: {e}')
    
    return False

In [6]:
def create_db(ch_client, db_name):
    """
    Creates tabase with db_name
    """
    sql= f"""
        CREATE DATABASE iF NOT EXISTS {db_name}
    """
    
    try:
        ch_client.command(sql)
        logging.info(f'Database {db_name} created successfully')
    except Exception as e:
        logging.error(f'Error creating database: {e}')

    return check_db_existance(ch_client, db_name)

In [7]:
def check_storage(storage_name):
    """
    Checks if storage exists and bucket is accessible with provided credentials and tries to create if it doesn't exist
    """
    try:
        s3 = boto3.client(
            's3',
            endpoint_url=storage_name['path'],
            aws_access_key_id=storage_name['user'],
            aws_secret_access_key=storage_name['pass']
        )

        s3.head_bucket(Bucket=storage_name['bucket'])
        logging.info(f"Bucket {storage_name['bucket']} is accessible.")

        return True

    except EndpointConnectionError as e:
        logging.error(f'Cannot connect to endpoint: {e}')
    except ClientError as e:
        logging.error(f'S3 error: {e}')

        # If bucket doesn't exist - try to create
        if int(e.response['Error']['Code']) == 404:
            logging.error(f"Bucket {storage_name['bucket']} not found — creating...")

            try:
                s3.create_bucket(Bucket=storage_name['bucket'])
                print(f"Bucket {storage_name['bucket']} created.")
            except Exception as ee:
                logging.error(f"Cannot create bucket {storage_name['bucket']}. Error: {ee}")

    except Exception as e:
        logging.error(f'Error checking storage: {e}')

    return False

In [8]:
# Check existance of necessary storages
for storage in [bronze_storage, clickhouse_storage]:
    check_storage(storage)


INFO:root:Bucket nyc-taxi-raw-data is accessible.
INFO:root:Bucket nyc-taxi-db is accessible.


In [9]:
# Try to connect to ClickHouse instance
try:
    clickhouse_client = clickhouse_connect.get_client(
        host=clickhouse_host,
        port=clickhouse_port,
        username=clickhouse_user,
        password=clickhouse_password)
    logging.info(f'Connected to Clickhouse successfully.')
except Exception as e:
    logging.error(f'Error connecting to Clickhouse: {e}')
    

INFO:root:Connected to Clickhouse successfully.


In [10]:
def create_staging_schema(ch_client, db_name):
    """
    Creates neccessary tables for staging layer, if they don't exist
    """
    logging.info(f'Creating schema for staging layer (database {db_name}), if not exists')
    sql = f"""
        CREATE TABLE IF NOT EXISTS {db_name}.bronze_files(
            file_name   String NOT NULL,
            downloaded_dt  Datetime DEFAULT now(),
            source_url  String NOT NULL,
            processed   Boolean DEFAULT False,
            processed_dt    Datetime DEFAULT NULL
        )
        ENGINE = ReplacingMergeTree()
        ORDER BY file_name
        SETTINGS enable_block_number_column = 1,
            enable_block_offset_column = 1;
    """

    try:
        ch_client.command(sql)
    except Exception as e:
        logging.error(f'Error creating schema for database {db_name}: {e}')

In [11]:
def create_silver_schema(ch_client, db_name):
    """
    Creates neccessary tables for silver layer, if they don't exist
    """
    # List of SQL scripts to run
    sql_scripts = ['silver.sql', 'populate_vendors.sql', 'populate_rates.sql', 'populate_payment_types.sql']

    # Select silver layer database
    try:
        ch_client.command(f'USE {db_name};')
    except Exception as e:
        logging.error(f'Error selecting database {db_name}: {e}')
    
    # For each script load it from file and execute
    for script in sql_scripts:
        with open(script, 'r', encoding='utf-8') as file:
            sql = file.read()


        # Get individual commands from script
        for cmd in sql.split(';'):
            cmd = cmd.strip()
            if cmd:
                try:
                    # Execute command
                    ch_client.command(cmd)
                except Exception as e:
                    logging.error(f'Error creating silver layer schema: {e}')


In [12]:
def create_gold_schema(ch_client, db_name):
    """
    Creates neccessary tables for gold layer, if they don't exist
    """
    pass

In [13]:
def check_schema(ch_client, db_name):
    logging.info(f'Checking database {db_name} schema')

    if db_name == clickhouse_staging_db_name:
        create_staging_schema(ch_client, db_name)
    elif db_name == clickhouse_silver_db_name:
        create_silver_schema(ch_client, db_name)
    elif db_name == clickhouse_gold_db_name:
        create_gold_schema(ch_client, db_name)
    else:
        logging.error(f'Wrong database name!')

In [14]:
# Check existance of necessary databases
for db in [clickhouse_staging_db_name, clickhouse_silver_db_name, clickhouse_gold_db_name]:
    # If database doesn't exist - create database
    if not check_db_existance(clickhouse_client, db):
        logging.info(f'Database {db} not found.')
        if not create_db(clickhouse_client, db):
            logging.error('Cannot create database!')

    # Check database schema
    check_schema(clickhouse_client, db)
    

INFO:root:Database nyc_taxi_staging exists: True
INFO:root:Checking database nyc_taxi_staging schema
INFO:root:Creating schema for staging layer (database nyc_taxi_staging), if not exists
INFO:root:Database nyc_taxi_silver exists: True
INFO:root:Checking database nyc_taxi_silver schema
INFO:root:Database nyc_taxi_datamarts exists: True
INFO:root:Checking database nyc_taxi_datamarts schema


In [15]:
def get_downloaded_files(ch_client, db_name, file_name=''):
    """
    Get list of already downloaded files from database
    (may be different from list of files in the bucket)
    If file_name parameter is present - returns only corresponding records
    """

    sql = f"""
        SELECT file_name
            FROM {db_name}.bronze_files
    """
    
    # If file_name is present - add filter by file name
    if file_name:
        sql += f" WHERE file_name = '{file_name}'"

    try:
        result = ch_client.query(sql).result_rows
        files_list = [r[0] for r in result]

        return files_list
        
    except Exception as e:
        logging.error(f'Error getting downloadded files list: {e}')
    


In [16]:
def write_file_info_to_db(ch_client, db_name, file_name, file_url):
    """
    Writes or updates information on downloaded file into staging database table
    Table bronze_files uses ReplacingMergeTree
    """
    
    sql = f"""
        INSERT INTO {db_name}.bronze_files (file_name, source_url)
            VALUES ('{file_name}', '{file_url}')
    """

    try:
        ch_client.command(sql)
        logging.info(f'File {file_name} information saved to database {db_name}')
    except Exception as e:
        logging.error(f'Error while saving file {file_name} information to database {db_name}: {e}')
    

In [17]:
def download_file(file_url, storage):
    """
    Downloads file from given URL and saves to given storage
    """
    try:
        headers = {
            'Referer': 'https://www.nyc.gov/',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
            #'Accept-encoding': 'gzip, deflate, br, zstd',
        }
        
        with requests.get(file_url, headers=headers, stream=True) as file:
            # If status is not 2xx - raise error
            file.raise_for_status()

            # Extract file name from URL
            file_name = file_url.split('/')[-1]

            try:
                s3 = boto3.client(
                    's3',
                    endpoint_url=storage['path'],
                    aws_access_key_id=storage['user'],
                    aws_secret_access_key=storage['pass']
                )
                s3.upload_fileobj(file.raw, storage['bucket'], file_name)
                logging.info(f"File {file_name} uploaded to storage {storage['path']}{storage['bucket']}")

                return True
            except Exception as ee:
                    logging.error(f"Error saving file {file_name} to storage {storage['path']}{storage['bucket']}. Error: {ee}")
        
    except Exception as e:
        logging.error(f'Error downloading file {file_url}: {e}')
    
    return False

In [18]:
def mark_file_processed(ch_client, db_name, file_name):
    """
    Set field 'processed' in table bronze_files as True for given file
    The table uses ReplacingMergeTree engine, so we use INSERT to update
    """

    sql = f"""
        ALTER TABLE {db_name}.bronze_files
        UPDATE processed = True,
            processed_dt = now()
        WHERE file_name = '{file_name}';
    """

    try:
        ch_client.command(sql)
        logging.info(f'File {file_name} marked as processed.')
    except Exception as e:
        logging.error(f'Error marking file {file_name} as processed: {e}')

In [19]:
def insert_taxi_zones_into_silver(ch_client, file_name, storage, db_name):
    """
    Insert the data from file with lookuptaxi zones lookup table
    into h_taxi_zone hub
    """
    
    logging.info(f'Inserting data from file {file_name} into silver layer.')

    try:
        s3 = boto3.client(
            's3',
            endpoint_url=storage['path'],
            aws_access_key_id=storage['user'],
            aws_secret_access_key=storage['pass']
        )

        file = s3.get_object(Bucket=storage['bucket'], Key=file_name)

        # Make panadas dataframe from csv for further processing
        csv_data = file['Body'].read()
        df = pd.read_csv(BytesIO(csv_data))

        # Drop duplicates
        df = df.drop_duplicates(subset=['LocationID'])
        # Add 'source' field
        df['source'] = 'Data Dictionary – Yellow Taxi Trip Records - March 18, 2025'
        # Cast data types just in case
        df['LocationID'] = df['LocationID'].astype(int)
        df['Borough'] = df['Borough'].astype(str)
        df['Zone'] = df['Zone'].astype(str)
        df['service_zone'] = df['service_zone'].astype(str)

        # Insert into hub
        data_to_insert = [tuple(x) for x in df[['LocationID', 'source']].to_numpy()]
        ch_client.insert(f'{db_name}.hub_taxi_zone', data_to_insert, column_names=['zone_id', 'record_source'])

        # Insert into satellite
        data_to_insert = [tuple(x) for x in df[['LocationID', 'Borough', 'Zone', 'service_zone', 'source']].to_numpy()]
        ch_client.insert(f'{db_name}.sat_taxi_zone_details', data_to_insert, column_names=['taxi_zone_id', 'borough', 'zone', 'service_zone', 'record_source'])
        
        # Mark lookup file as processed
        mark_file_processed(ch_client, clickhouse_staging_db_name, file_name)
        logging.info(f'Taxi zones uploaded into database {db_name}')

    except Exception as e:
        logging.error(f'Error loading taxi zones data into database {db_name}: {e}')



In [20]:
# Download lookup table if it is not downloaded already
if not len(get_downloaded_files(clickhouse_client, clickhouse_staging_db_name, lookup_table_file_name)):
    # Don't use function download_file for this file as it uses streming download
    try:
        response = requests.get(lookup_table_file_url)
        response.raise_for_status()

        s3 = boto3.client(
            's3',
            endpoint_url=bronze_storage['path'],
            aws_access_key_id=bronze_storage['user'],
            aws_secret_access_key=bronze_storage['pass']
        )

        s3.upload_fileobj(BytesIO(response.content), bronze_storage['bucket'], lookup_table_file_name)
        logging.info(f"File {lookup_table_file_name} uploaded to storage {bronze_storage['path']}{bronze_storage['bucket']}")
        
        write_file_info_to_db(clickhouse_client, clickhouse_staging_db_name, lookup_table_file_name, lookup_table_file_url)
    except Exception as e:
        logging.error(f'Error downloading lookup table {lookup_table_file_url}: {e}')
else:
    logging.info(f'Taxi zones lookup table is already downloaded')

INFO:root:Taxi zones lookup table is already downloaded


In [73]:
# Upload the data into silver layer
insert_taxi_zones_into_silver(clickhouse_client, lookup_table_file_name, bronze_storage, clickhouse_silver_db_name)

INFO:root:Inserting data from file taxi_zone_lookup.csv into silver layer.
INFO:root:File taxi_zone_lookup.csv marked as processed.
INFO:root:Taxi zones uploaded into database nyc_taxi_silver


In [99]:
# Download taxi zones shape file if it is not downloaded already
try:
    s3 = boto3.client(
        's3',
        endpoint_url=bronze_storage['path'],
        aws_access_key_id=bronze_storage['user'],
        aws_secret_access_key=bronze_storage['pass']
    )

    s3.head_object(Bucket=bronze_storage['bucket'], Key=shape_file_name)
    logging.info(f'Shape file {shape_file_name} already exists')
except ClientError as e:
    if e.response['Error']['Code'] == '404':
        logging.error(f"Shape file {shape_file_name} doesn't exist, downloading")       
        download_file(shape_file_url, bronze_storage)
    else:
        logging.error(f"Error checking shape file {shape_file_name} in storage {bronze_storage['path']}{bronze_storage['bucket']}: {e}")


INFO:root:Shape file taxi_zones.zip already exists


In [79]:
# Download raw data file for each month from the last upload
start_date = datetime.strptime('2009-01-01', '%Y-%m-%d')
start_year = int(start_date.year)
start_month = int(start_date.month)

end_date = datetime.now()
end_year = int(end_date.year)
end_month = int(end_date.month)

# Construct a file name for each month and download it
tmp_counter = 0

# Getting the list of already downloaded files
# Checking all files every time the DAG runs allows to download them in any order
# and not rely on specific order (as if getting maximum date etc)
# Keeping information in the table instead of checking existance in the bucket
# allows to drop older processed files wothout them being downloaded again
downloaded_files_list = get_downloaded_files(clickhouse_client, clickhouse_staging_db_name)

for year in range(start_year, end_year + 1):
    for month in range(start_month, 13):
        raw_data_file_name = f'yellow_tripdata_{year}-{month:02d}.parquet'
        raw_data_url = raw_data_base_link + raw_data_file_name

        # If that file has already been downloaded - get next one
        if raw_data_file_name in downloaded_files_list:
             continue;

        logging.info(f'Downloading file {raw_data_url}')
        if download_file(raw_data_url, bronze_storage):
            write_file_info_to_db(clickhouse_client, clickhouse_staging_db_name, raw_data_file_name, raw_data_url)

        tmp_counter += 1
        if tmp_counter >= 1:
            break

    break

    # For all years except start_year start_month = 1
    start_month = 1


INFO:root:Downloading file https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-05.parquet
INFO:root:File yellow_tripdata_2009-05.parquet uploaded to storage http://localhost:9000/nyc-taxi-raw-data
INFO:root:File yellow_tripdata_2009-05.parquet information saved to database nyc_taxi_staging


In [22]:
def get_unprocessed_files_list(ch_client, db_name):
    """
    Returns list of downloaded, but unprocessed raw data files
    """
    sql = f"""
        SELECT file_name
            FROM {db_name}.bronze_files
        WHERE processed == False
    """

    try:
        result = ch_client.query(sql).result_rows
        files_list = [r[0] for r in result]

        return files_list

    except Exception as e:
        logging.error(f'Error getting unprocessed files list from database {db_name}')

In [23]:
def drop_table(ch_client, db_name, table_name):
    """
    Drops table with table_name in database db_name
    """

    sql = f"""
        DROP TABLE IF EXISTS {db_name}.{table_name}
    """

    try:
        ch_client.command(sql)
    except Exception as e:
        logging.error(f'Error while dropping table {table_name} from database {db_name}: {e}')

In [24]:
def bronze_to_staging(ch_client, db_name, file_name, storage):
    """
    Import data from raw data file into staging table
    Get all columns of data file and add load_timestamp and record_source fields
    """

    # Drop target table before uploading data
    # instead of truncating it
    # This approach gives opportunity for schema changes in raw data files
    drop_table(ch_client, db_name, 'staging_data')

    storage = {'path': 'http://def-minio:9000/', 'user': 'minioadmin', 'pass': 'minioadmin', 'bucket': 'nyc-taxi-raw-data'}

    # Create table from raw data file
    sql = f"""
        CREATE TABLE {db_name}.staging_data
        ENGINE = Log
        AS
        SELECT DISTINCT *, now() AS load_timestamp, \'{storage['path']}{storage['bucket']}/{file_name}\' AS record_source
            FROM s3(
                \'{storage['path']}{storage['bucket']}/{file_name}\',
                'minioadmin',
                'minioadmin',
                'Parquet'
            )
    """

    try:
        ch_client.command(sql)
        logging.info(f'Data from file {file_name} successfully uploaded to database {db_name}')
    except Exception as e:
        logging.error(f'Error while uploading data from file {file_name} into database {db_name}: {e}')

In [None]:
def get_zone_ids(df, spark):
    """
    Find zone id in shape file by coordinates
    """
    logging.info(f'Finding zone ids in shape file')

    try:
        s3 = boto3.client(
            's3',
            endpoint_url=bronze_storage['path'],
            aws_access_key_id=bronze_storage['user'],
            aws_secret_access_key=bronze_storage['pass']
        )

        # Get zip file from storage
        shp_zip_file_name = shape_file_url.split('/')[-1]
        print(shp_zip_file_name)
        obj = s3.get_object(Bucket=bronze_storage['bucket'], Key=shp_zip_file_name)
        zip_data = BytesIO(obj['Body'].read())

        # Read shapes from zip
        zones = gpd.read_file(f'zip://{zip_data}')
        # Convert CRS
        zones = zones.to_crs(epsg=4326)
        # Read shape file
        print(zones)
        return
        zones = spark.read.format('geospark') \
            .option('geomField', 'geometry') \
            .load(f"s3a:///{bronze_storage['bucket']}/{shape_file_name}")
        print('c')
        # Transform coordinates in dataframe into geometry
        df = df.withColumn('pickup_geom', \
            expr('ST_Point(cast(start_lon as Decimal(9,6)), cast(start_lat as Decimal(9,6)))')) \
            .withColumn('dropoff_geom', \
            expr('ST_Point(cast(end_lon as Decimal(9,6)), cast(end_lat as Decimal(9,6)))'))
        print('d')
    except Exception as e:
        logging.error(f"Error loading shape file {shape_file_name} in storage {bronze_storage['path']}{bronze_storage['bucket']}: {e}")

    return df

In [96]:
staging_to_silver(clickhouse_staging_db_name, clickhouse_silver_db_name)

INFO:root:Connection to database nyc_taxi_staging successful. Processing data for silver layer.
a
INFO:root:Finding zone ids in shape file
ERROR:root:Error loading shape file taxi_zones.shp in storage http://localhost:9000/nyc-taxi-raw-data: '/vsizip/<_io.BytesIO object at 0x0000023B19A84950>' does not exist in the file system, and is not recognized as a supported dataset name.
b


0

In [31]:
def drop_outbursts(df, column_name):
    """
    Drop rows outside 1.5 IQR
    """
    # Calculate quantiles
    q1, q3 = df.approxQuantile(column_name, [0.25, 0.75], 0.01)
    iqr = q3 - q1

    # Calculate boundaries
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr

    # Drop rows outside boudaries
    df_clean = df.filter((F.col(column_name) >= lower) & (F.col(column_name) <= upper))
    
    logging.info(f'Deleted outbursts for column {column_name}')

    return df_clean

In [88]:
def staging_to_silver(staging_db, silver_db):
    """
    Process data from staging table and insert into silver layer
    """
    # Process staging data using Spark
    try:
        spark.stop()
    except:
        pass

    spark = SparkSession.builder \
        .appName('Staging2Silver') \
        .master('spark://spark-master:7077') \
        .config('spark.driver.host', 'host.docker.internal') \
        .config('spark.driver.bindAddress', '0.0.0.0') \
        .config("spark.jars.packages",
            "org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.6.1,"
            "org.datasyslab:geotools-wrapper:geotools-24.1") \
        .getOrCreate()

    try:
        df = spark.read \
            .format('jdbc') \
            .option('url', f'jdbc:clickhouse://{clickhouse_host}:{clickhouse_port}/{clickhouse_staging_db_name}') \
            .option('driver', 'com.clickhouse.jdbc.ClickHouseDriver') \
            .option('dbtable', 'staging_data') \
            .option('user', f'{clickhouse_user}') \
            .option('password', f'{clickhouse_password}') \
            .load()
        logging.info(f'Connection to database {staging_db} successful. Processing data for silver layer.')
        #df.show(5)
        print('a')
        get_zone_ids(df, spark)
        print('b')
        return 0
        logging.info(f'Rows before cleaning: {df.count()}')

        # By default suppose that dataframe doesn't contain taxi zone ids
        df_has_location_ids = False

        # Check columns one by one
        # Schema and semantics of data have changed over time
        for column in df.columns:
            column_name = column.lower()

            # Check if column contains LocationID - can be PULocationID and DOLocationID
            if 'locationid' in column_name:
                df_has_location_ids = True
                # Drop rows with null and negative values and values greater than 500
                df = df.filter(col(column)).isNotNull & (col(column) > 0) & (col(column) <= 500)

            # In early years 'vendor' column was named vendor_name and contained string codes
            if column_name == 'vendor_name':
                # Replace vendor string code to index and covert to int type
                df = df.withColumn(column, \
                    when(upper(trim(col(column))) == 'CMT', 1) \
                    .when(upper(trim(col(column))) == 'VTS', 101) \
                    .when(upper(trim(col(column))) == 'DDS', 102) \
                    .otherwise(999) \
                    .cast('int'))
                # Rename column
                df = df.withColumnRenamed(column, 'vendor_id')
            
            # Later 'vendor' column was named vendorid and contained integer ids
            elif column_name == 'vendorid':
                # Convert columnt to int
                df = df.withColumn(column, \
                    col(column).cast('int'))
                # Rename column
                df = df.withColumnRenamed(column, 'vendor_id')
            
            # Pickup datetime
            elif 'pickup_datetime' in column_name:
                # Drop rows with null
                df = df.filter(col(column).isNotNull())
                # Convert to datetime
                df = df.withColumn(column, \
                    col(column).cast('timestamp'))
                # Rename column
                df = df.withColumnRenamed(column, 'pickup_datetime')

            # Dropoff datetime
            elif 'dropoff_datetime' in column_name:
                # Drop rows with null
                df = df.filter(col(column).isNotNull())
                # Convert to datetime
                df = df.withColumn(column, \
                    col(column).cast('timestamp'))
                # Rename column
                df = df.withColumnRenamed(column, 'dropoff_datetime')

            # Passenger count - can be 0 or null
            elif 'passenger' in column_name:
                # Drop negative values and values greater than 10
                df = df.filter(col(column).isNull() | ((col(column).cast('int') >= 0) & (col(column).cast('int') <= 10)))
                # Convert to int
                df = df.withColumn(column, \
                    col(column).cast('int'))
                # Rename column
                df = df.withColumnRenamed(column, 'passenger_count')

            # Trip distance - can be 0 or null
            elif 'distance' in column_name:
                # Drop negative values and values greater than 1000 miles
                df = df.filter(col(column).isNull() | ((col(column).cast('int') >= 0) & (col(column).cast('int') <= 1000)))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(6, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'trip_distance')

            # Rate id
            elif 'rate' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to int
                df = df.withColumn(column, \
                    col(column).cast('int'))
                # Rename column
                df = df.withColumnRenamed(column, 'rate_id')

            # Store and forward flag
            elif 'store' in column_name:
                # Convert to bool
                df = df.withColumn(column, \
                    when(upper(trim(col(column))) == 'Y', 1) \
                    .when(upper(trim(col(column))) == 'N', 0) \
                    .otherwise(None)
                    .cast('boolean'))
                # Rename column
                df = df.withColumnRenamed(column, 'store_and_fwd')

            # Payment type - can be string or int id
            elif 'payment' in column_name:
                # Replace strings for indexes
                df = df.withColumn(column, \
                    when(upper(trim(col(column))) == 'CREDIT', 1) \
                    .when(upper(trim(col(column))) == 'CRD', 1) \
                    .when(upper(trim(col(column))) == 'CASH', 2) \
                    .when(upper(trim(col(column))) == 'CSH', 2) \
                    .when(upper(trim(col(column))) == 'NO CHARGE', 3) \
                    .when(upper(trim(col(column))) == 'NOC', 3) \
                    .when(upper(trim(col(column))) == 'DISPUTE', 4) \
                    .when(upper(trim(col(column))) == 'DIS', 4) \
                    .when(upper(trim(col(column))) == 'UNK', 5) \
                    .when(col(column).isNull(), 5) \
                    .cast('int'))
                # Rename column
                df = df.withColumnRenamed(column, 'payment_type_id')

            # Fare amount
            elif 'fare_am' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(6, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'fare_amount')

            # Surcharges
            elif 'extra' in column_name or 'surcharge' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(6, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'extra')

            # MTA tax
            elif 'mta_tax' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(5, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'mta_tax')

            # Tip amount (only for credit cards)
            elif 'tip_am' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(6, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'tip_amount')

            # Tolls amount
            elif 'tolls_am' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(6, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'tolls_amount')

            # Improvement surcharge
            elif 'improvement' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(5, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'improvement_surcharge')

            # Total amount
            elif 'total_am' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(6, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'total_amount')

            # Congestion surcharge
            elif column_name == 'congestion_surcharge' in column_name:
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(5, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'congestion_surcharge')

            # Airport fee
            elif column_name == 'airport_fee':
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(5, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'airport_fee')

            # CBD congestion surcharge
            elif column_name == 'cbd_congestion_fee':
                # Drop negative values
                df = df.filter(col(column).isNull() | (col(column).cast('int') >= 0))
                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(5, 2)))
                # Rename column
                df = df.withColumnRenamed(column, 'cbd_congestion_fee')

            # Pickup and dropoff location coordinates
            elif 'start_' in column_name or 'end_' in column_name:
                # Drop null values
                df = df.filter(col(column).isNotNull())
                # Drop outbursts
                df = drop_outbursts(df, column)

                # Convert to decimal
                df = df.withColumn(column, \
                    col(column).cast(DecimalType(9, 6)))
                # Rename column
                df = df.withColumnRenamed(column, column_name)

            else:
                continue
            
        df.show(5)
        logging.info(f'Rows after cleaning: {df.count()}')

        # Find taxi zone number by coordinates
        if not df_has_location_ids:
            df = get_zone_ids(df, spark)

    except Exception as e:
        logging.error(f'Error connecting to database {staging_db} from Spark cluster: {e}')

    finally:
        try:
            spark.stop()
        except:
            pass




In [29]:
def is_spark_master_available(host='localhost', port=7077, timeout=3):
    try:
        with socket.create_connection((host, port), timeout=timeout):
            return True
    except OSError:
        return False

if is_spark_master_available():
    print('Spark master доступен!')
else:
    print('Не удалось подключиться к Spark master.')

Spark master доступен!


In [86]:
#spark.stop()
staging_to_silver(clickhouse_staging_db_name, clickhouse_silver_db_name)


INFO:root:Connection to database nyc_taxi_staging successful. Processing data for silver layer.
a
ERROR:root:Error connecting to database nyc_taxi_staging from Spark cluster: 'JavaPackage' object is not callable


In [69]:
def silver_to_gold(file_name):
    """
    Create datamarts from silver layer
    """

    print('\t\t', file_name, 'gold')

In [58]:
# For each downloaded and unprocessed file with raw data
# - import to staging
# - process to silver
# - process to gold
unprocessed_files_list = get_unprocessed_files_list(clickhouse_client, clickhouse_staging_db_name)

for file in unprocessed_files_list:
    bronze_to_staging(clickhouse_client, clickhouse_staging_db_name, file, bronze_storage)
    #staging_to_silver(clickhouse_staging_db_name, clickhouse_silver_db_name)
    #silver_to_gold(file)

    # На время отладки
    break;

INFO:root:Data from file yellow_tripdata_2009-02.parquet successfully uploaded to database nyc_taxi_staging


In [None]:
# Добавляем данные в силвер

In [None]:
# делаем 27 и 28 в цикле до текущего месяца