This Notebook I will test how to implement a function that will take a csv and create a partitioned parquet file

Testing partition locally to see if it works with pyspark

cleaning my csv file with Csv class but changing the class so it doesnt convert to parquet. This is because its better to write the partitions directly to a parquet file.

In [10]:
import pandas as pd
import os
import pyarrow.parquet as pq
import pyarrow as pa

In [109]:
class CsvCleaner:
    @staticmethod
    def timestamp_clean(df:pd.DataFrame, col_name:str) -> tuple[pd.DataFrame,int]:
        """
        Cleans the DataFrame by converting the specified column to numeric, 
        filtering out rows with invalid timestamps, converting timestamps to datetime, 
        sorting by timestamp, and returning the cleaned DataFrame along with 
        the number of rows removed.

        Args:
            df (pd.DataFrame): The DataFrame to be cleaned.
            col_name (str): The name of the column containing timestamps.

        Returns:
            tuple[pd.DataFrame, int]: A tuple containing the cleaned DataFrame 
            and the number of rows removed.
        """
        # convert the column to numeric with any errors(for example strings or letter) to NaN
        df[col_name] = pd.to_numeric(df[col_name], errors="coerce")
        
        df.dropna(subset=[col_name], inplace=True)

        # Calculate the initial number of rows
        initial_rows = df.shape[0]
        
        #Filter out rows with "Timestamp" values not containing 10 digits
        df = df[df[col_name].apply(lambda x: len(str(int(x))) == 10)]

        #calculate how many rows removed
        rows_removed = initial_rows - df.shape[0]

        # Convert the Unix timestamp to datetime with seconds
        df[col_name] = pd.to_datetime(df[col_name], unit="s")

        # Sort the DataFrame by the timestamp column
        df = df.sort_values(by=col_name)

        return df, rows_removed
    
    @staticmethod
    def clean_columns(df: pd.DataFrame, col_name: str, low: float, high: float) -> tuple[pd.DataFrame, int]:
        """
        Cleans the specified column in the DataFrame by converting it to numeric, 
        filtering out values that are not within the specified range, 
        and returning the cleaned DataFrame along with the number of rows removed.

        Args:
            df (pd.DataFrame): The DataFrame to be cleaned.
            col_name (str): The name of the column to be cleaned.
            low (float): The lower bound of the acceptable range.
            high (float): The upper bound of the acceptable range.

        Returns:
            tuple[pd.DataFrame, int]: A tuple containing the cleaned DataFrame 
            and the number of rows removed.
        """

        # Convert column to numeric, making errors to Nan instead
        df[col_name] = pd.to_numeric(df[col_name], errors="coerce")

        # Calculate the initial number of rows
        initial_rows = df.shape[0]

        df.loc[~df[col_name].between(low, high), col_name] = float('nan')

        # Calculate the number of rows removed
        rows_removed = initial_rows - df.shape[0]

        #df[col_name] = df[col_name].interpolate()

        return df, rows_removed
    
    @staticmethod
    def clean_file(df: pd.DataFrame, file_key: str) ->str:
        """
        Cleans the DataFrame by applying specific cleaning operations based on column names,
        saves the cleaned DataFrame as a Parquet file, and returns the path to the cleaned Parquet file.

        Args:
            df (pd.DataFrame): The DataFrame to be cleaned.
            file_key (str): The key of the file.

        Returns:
            str: The path to the cleaned Parquet file.
        """
        
        total_rows_removed = 0

        # Clean the DataFrame
        for col in df.columns:
            if "Timestamp" in col:
                df, rows_removed = CsvCleaner.timestamp_clean(df.copy(), col)
                df[col] = pd.to_datetime(df[col])
                total_rows_removed += rows_removed
            
            if "speed_over_ground" in col:
                low = 0
                high = 100
                df, rows_removed = CsvCleaner.clean_columns(df.copy(), col, low, high)
                df[col] = df[col].astype(float)
                total_rows_removed += rows_removed
            
            if "Longitude" in col:
                low = -180
                high = 180
                df, rows_removed = CsvCleaner.clean_columns(df.copy(), col, low, high)
                df[col] = df[col].astype(float)
                total_rows_removed += rows_removed

            if "Latitude" in col:
                low = -90
                high = 90
                df, rows_removed = CsvCleaner.clean_columns(df.copy(), col, low, high)
                df[col] = df[col].astype(float)
                total_rows_removed += rows_removed

            if "engine_fuel_rate" in col:
                low = 0
                high = 100
                df,rows_removed = CsvCleaner.clean_columns(df.copy(), col, low, high)
                df[col] = df[col].astype(float)
                total_rows_removed += rows_removed
        
            
        # Resample the DataFrame
        df.set_index('Timestamp', inplace=True)
        df = df.resample('10s').mean()  # No fillna(0) here
        df = df.reset_index()
        
        # Save as partitioned Parquet file
        parquet_file = CsvCleaner._partition_and_save(df, file_key)
        
        print(f"Total rows removed: {total_rows_removed}")


        return parquet_file, file_key
    
    @staticmethod
    def _partition_and_save(df: pd.DataFrame, file_key: str) -> str:
        """
        Partitions and saves the cleaned DataFrame as a Parquet file.

        Args:
            df (pd.DataFrame): The DataFrame to be saved.
            file_key (str): The key of the file.

        Returns:    
            str: The path to the saved Parquet file.
        """
        # Extract vessel name from the file key
        vessel_name = file_key.split('_')[0] # use path lib here for realdata

        # Partition by timestamp and vessel name
        df["year"] = df["Timestamp"].dt.year.astype(str)
        df["month"] = df["Timestamp"].dt.month.astype(str).str.zfill(2)
        df["day"] = df["Timestamp"].dt.day.astype(str).str.zfill(2)
        df["vessel"] = vessel_name

        # Define the partition keys
        partition_cols = ["vessel", "year", "month", "day"]

        # Save as partitioned Parquet file
        cleaned_parquet_file = f"../src/Data/partitioned_parquets/{file_key}.parquet" # change this to temp file
        table = pa.Table.from_pandas(df)
        # root path need to go to s3
        pq.write_to_dataset(table, root_path="../src/Data/partitioned_parquets/", partition_cols=partition_cols)

        return cleaned_parquet_file

In [105]:
file_key = "vessel1_dummy_boat_data.csv"
df = pd.read_csv("../src/Data/vessel1_dummy_boat_data.csv")

In [106]:
df.head()

Unnamed: 0,Timestamp,speed_over_ground,Longitude,Latitude,engine_fuel_rate
0,1675119600,3.08,-119.605263,35.162237,14.92
1,1675119601,4.11,-118.857776,35.228734,21.02
2,1675119602,4.04,-118.110278,35.295227,20.53
3,1675119603,3.5,-117.362786,35.361725,17.21
4,1675119604,ERROR,-116.615295,35.428215,17.38


In [107]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 5 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   Timestamp          990 non-null    object
 1   speed_over_ground  990 non-null    object
 2   Longitude          990 non-null    object
 3   Latitude           990 non-null    object
 4   engine_fuel_rate   990 non-null    object
dtypes: object(5)
memory usage: 39.2+ KB


In [108]:
cleaned_parquet_file = CsvCleaner.clean_file(df, file_key)

Total rows removed: 5


  df = df.resample('10S').mean()  # No fillna(0) here


In [110]:
pq.read_metadata("../src/Data/partitioned_parquets/vessel=vessel1/year=2023/month=01/day=30/9da8e6605723499d97fa3e8a292317e9-0.parquet")

<pyarrow._parquet.FileMetaData object at 0x7f276b59eb10>
  created_by: parquet-cpp-arrow version 15.0.0
  num_columns: 5
  num_rows: 100
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 4540

In [111]:
table = pq.read_table("../src/Data/partitioned_parquets/vessel=vessel1/year=2023/month=01/day=30/9da8e6605723499d97fa3e8a292317e9-0.parquet")

df_parquet = table.to_pandas()

df_parquet.head()

Unnamed: 0,Timestamp,speed_over_ground,Longitude,Latitude,engine_fuel_rate
0,2023-01-30 23:00:00,3.822222,-116.241552,35.46146,18.863
1,2023-01-30 23:00:10,4.55,-108.766642,36.159641,22.651
2,2023-01-30 23:00:20,5.412,-101.291727,36.795025,27.07
3,2023-01-30 23:00:30,14.632222,-93.941394,37.445181,30.208889
4,2023-01-30 23:00:40,7.013333,-82.81208,38.435198,35.202222


In [112]:
file_key_2 = "vessel2_dummy_boat_data.csv"
df_2 = pd.read_csv("../src/Data/vessel2_dummy_boat_data.csv")


In [113]:
cleaned_parquet_file = CsvCleaner.clean_file(df_2, file_key_2)

Total rows removed: 5


In [114]:
table = pq.read_table("../src/Data/partitioned_parquets/vessel=vessel2/year=2023/month=01/day=30/a664216dec3f4e2787509cdbfed1a2ba-0.parquet")

df_parquet = table.to_pandas()

df_parquet.head()

Unnamed: 0,Timestamp,speed_over_ground,Longitude,Latitude,engine_fuel_rate
0,2023-01-30 23:00:00,3.822222,-116.241552,35.46146,18.863
1,2023-01-30 23:00:10,4.55,-108.766642,36.159641,22.651
2,2023-01-30 23:00:20,5.412,-101.291727,36.795025,27.07
3,2023-01-30 23:00:30,14.632222,-93.941394,37.445181,30.208889
4,2023-01-30 23:00:40,7.013333,-82.81208,38.435198,35.202222


In [7]:
import os
import boto3
from io import BytesIO

In [8]:
# accessing keys from .env file
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION_NAME = os.getenv("AWS_REGION_NAME")

In [13]:
# checking if lambda function works

# Create an S3 client
s3 = boto3.client('s3', 
                  aws_access_key_id=AWS_ACCESS_KEY_ID, 
                  aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
                  region_name=AWS_REGION_NAME)

# Read CSV file from S3 into a Pandas DataFrame
try:
    # Use 's3.get_object' to get the object and 'pd.read_csv' to read it into a DataFrame
    obj = s3.get_object(Bucket="new-parquet-files", Key="vessel1/year=2023/month=01/day=30/vessel1_dummy_boat_data.parquet")

    obj_bytes = obj["Body"].read()

    obj_bytes_io = BytesIO(obj_bytes)

    df = pd.read_parquet(obj_bytes_io)
    print(df)

except Exception as e:
    print(f"Error: {e}")

             Timestamp  speed_over_ground   Longitude   Latitude  \
0  2023-01-30 23:00:00           3.822222 -116.241552  35.461460   
1  2023-01-30 23:00:10           4.550000 -108.766642  36.159641   
2  2023-01-30 23:00:20           5.412000 -101.291727  36.795025   
3  2023-01-30 23:00:30          14.632222  -93.941394  37.445181   
4  2023-01-30 23:00:40           7.013333  -82.812080  38.435198   
..                 ...                ...         ...        ...   
95 2023-01-30 23:15:50          13.820000   84.295656  28.838670   
96 2023-01-30 23:16:00          14.364000   85.556275  34.389400   
97 2023-01-30 23:16:10          14.905000   86.849217  39.924228   
98 2023-01-30 23:16:20          15.516667   88.106244  45.944147   
99 2023-01-30 23:16:30          16.058889   89.435102  51.502830   

    engine_fuel_rate   vessel  
0          18.863000  vessel1  
1          22.651000  vessel1  
2          27.070000  vessel1  
3          30.208889  vessel1  
4          35.202222  v