In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Taxi_analysis_2024") \
    .config("spark.ui.port", "4041") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.memory", "1g") \
    .config("spark.jars", r"C:\Program Files\PostgreSQL\17\jdbc\postgresql-42.7.5.jar") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.hadoop.io.native.lib.available", "false")


In [None]:
spark.sparkContext.getConf().get("spark.jars")

In [3]:
import os

directory = r"F:\data project\Taxi_track_2024\data\2024"
data={}
for root, _, files in os.walk(directory):
    for file in files:
        file_path = os.path.join(root, file)
        file_name=file_path.split("\\")[5].split("_")[2].split(".")[0]
        data[file_name]=spark.read.parquet(file_path)
    

In [4]:
lookups={}
directory = r"F:\data project\Taxi_track_2024\data\lookups"
for root, _, files in os.walk(directory):
    for file in files:
        file_path = os.path.join(root, file)
        file_name=file_path.split("\\")[5].split(".")[0]
        lookups[file_name]=spark.read.csv(file_path,header=True,inferSchema=True)
   


In [None]:
lookups['payment_type_lookup'].show()

In [None]:
lookups['rate_lookup'].show()

In [None]:
lookups['taxi_zone_lookup'].show()

In [None]:
lookups['vendor_lookup'].show()

In [None]:
lookups['Store_and_fwd_flag'].show()

In [5]:
from pyspark.sql.functions import col, year, month, dayofmonth, hour, minute, when, date_format,unix_timestamp,count,round,concat_ws,lit

In [6]:
taxi_zone_pickup = lookups['taxi_zone_lookup'].alias("pickup_zone")
taxi_zone_dropoff = lookups['taxi_zone_lookup'].alias("dropoff_zone")
c=0
for i in data.keys():
        df=data[i].join(lookups['rate_lookup'],lookups['rate_lookup'].rate_id==data[i].RatecodeID,'left')\
                        .join(lookups['payment_type_lookup'],lookups['payment_type_lookup'].payment_id==data[i].payment_type,'left')\
                        .join(taxi_zone_pickup,col('PULocationID')==col("pickup_zone.LocationID"),'left')\
                        .join(taxi_zone_dropoff,col('DOLocationID')==col("dropoff_zone.LocationID"),'left')\
                        .join(lookups['vendor_lookup'],lookups['vendor_lookup'].vendor_id==data[i].VendorID,'left')\
                        .join(lookups['Store_and_fwd_flag'],lookups['Store_and_fwd_flag'].flag==data[i].store_and_fwd_flag,'left')\
                        .withColumn('tpep_pickup_datetime',col('tpep_pickup_datetime').cast('timestamp'))\
                        .withColumn('tpep_dropoff_datetime',col('tpep_dropoff_datetime').cast('timestamp'))\
                        .withColumn("duration",round((unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60,2))\
                        .withColumn("pickup_month", month("tpep_pickup_datetime")) \
                        .withColumn("pickup_day", dayofmonth("tpep_pickup_datetime")) \
                        .withColumn("pickup_day_of_week", date_format("tpep_pickup_datetime", "EEEE"))\
                        .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
                        .withColumn("pickup_minute", minute("tpep_pickup_datetime")) \
                        .withColumn("pickup_time_of_day", 
                                when(col("pickup_hour").between(0, 5), "Morning")
                                .when(col("pickup_hour").between(6, 11), "Afternoon")
                                .when(col("pickup_hour").between(12, 17), "Evening")
                                .when(col("pickup_hour").between(18, 23), "Night")
                                )\
                        .withColumn("dropoff_month", month("tpep_dropoff_datetime")) \
                        .withColumn("dropoff_day", dayofmonth("tpep_dropoff_datetime")) \
                        .withColumn("dropoff_day_of_week", date_format("tpep_dropoff_datetime", "EEEE"))\
                        .withColumn("dropoff_hour", hour("tpep_dropoff_datetime")) \
                        .withColumn("dropoff_minute", minute("tpep_dropoff_datetime")) \
                        .withColumn("dropoff_time_of_day", 
                                when(col("dropoff_hour").between(0, 5), "Morning")
                                .when(col("dropoff_hour").between(6, 11), "Afternoon")
                                .when(col("dropoff_hour").between(12, 17), "Evening")
                                .when(col("dropoff_hour").between(18, 23), "Night")
                                )
        if c==0:
                df_marged=df
                c=1
                continue
        
        df_marged=df_marged.union(df)


df_marged=df_marged.select(
                col('vendor_name'),
                col('flag_name'),
                col('pickup_zone.Borough').alias("PickupBorough"),
                col('pickup_zone.Zone').alias("PickupZone"),
                col('pickup_zone.service_zone').alias("Pickupservice_zone"),
                col('dropoff_zone.Borough').alias("DropoffBorough"),
                col('dropoff_zone.Zone').alias("DropoffZone"),
                col('dropoff_zone.service_zone').alias("Dropoffservice_zone"),
                col('payment_name'),
                col('rate_name'),
                col('tpep_pickup_datetime'),
                col('pickup_month'),
                col('pickup_day'),
                col('pickup_day_of_week'),
                col('pickup_hour'),
                col('pickup_minute'),
                col('pickup_time_of_day'),
                col('tpep_dropoff_datetime'),
                col('dropoff_month'),
                col('dropoff_day'),
                col('dropoff_day_of_week'),
                col('dropoff_hour'),
                col('dropoff_minute'),
                col('dropoff_time_of_day'),
                col('duration'),
                col('passenger_count'),
                col('trip_distance'),
                col('fare_amount'),
                col('extra'),
                col('mta_tax'),
                col('tip_amount'),
                col('improvement_surcharge'),
                col('total_amount'),
                col('congestion_surcharge'),
                col('Airport_fee')
            )

df_marged=df_marged.fillna({"passenger_count": 0})\
                   .fillna({"congestion_surcharge": 0})\
                   .fillna({"Airport_fee": 0})\
                   .fillna({"flag_name": "unknown"})\
                   .fillna({"payment_name":  "unknown"})\
                   .fillna({"rate_name": "unknown"})




In [None]:
df_marged.groupBy("store_and_fwd_flag").count().show()

In [None]:
df_marged.groupBy("flag_name").count().show()

In [None]:
df_marged.groupBy("rate_id").count().show()

In [None]:
df_marged.groupBy("payment_id").count().show() 

In [None]:
df_marged.count()

In [None]:
from pyspark.sql.functions import sum, avg, count, max, min,format_number

In [7]:
from pyspark.sql import functions as F


def aggregate_Taxi(df, groupby_cols):
    grouped_df = df.groupBy([F.col(col) for col in groupby_cols])
    agg_exprs = [
        F.format_number(F.sum('trip_distance'),2).alias('sum_trip_distance_in_mile'),
        F.format_number(F.avg('trip_distance'), 2).alias('avg_trip_distance_in_mile'),
        F.format_number(F.max('trip_distance'), 2).alias('max_trip_distance_in_mile'),
        F.format_number(F.min('trip_distance'), 2).alias('min_trip_distance_in_mile'),
        F.format_number(F.sum('duration'),2).alias('sum_trip_duration'),
        F.format_number(F.avg('duration'),2).alias('avg_trip_duration'),
        F.format_number(F.max('duration'),2).alias('max_trip_duration'),
        F.format_number(F.min('duration'),2).alias('min_trip_duration'),
        F.format_number(F.sum('fare_amount'), 2).alias('sum_fare_amount'),
        F.format_number(F.avg('fare_amount'), 2).alias('avg_fare_amount'),
        F.format_number(F.max('fare_amount'), 2).alias('max_fare_amount'),
        F.format_number(F.min('fare_amount'), 2).alias('min_fare_amount'),
        F.format_number(F.sum('extra'), 2).alias('sum_extra'),
        F.format_number(F.avg('extra'), 2).alias('avg_extra'),
        F.format_number(F.max('extra'), 2).alias('max_extra'),
        F.format_number(F.min('extra'), 2).alias('min_extra'),
        F.format_number(F.sum('mta_tax'), 2).alias('sum_mta_tax'),
        F.format_number(F.avg('mta_tax'), 2).alias('avg_mta_tax'),
        F.format_number(F.max('mta_tax'), 2).alias('max_mta_tax'),
        F.format_number(F.min('mta_tax'), 2).alias('min_mta_tax'),
        F.format_number(F.sum('tip_amount'), 2).alias('sum_tip_amount'),
        F.format_number(F.avg('tip_amount'), 2).alias('avg_tip_amount'),
        F.format_number(F.max('tip_amount'), 2).alias('max_tip_amount'),
        F.format_number(F.min('tip_amount'), 2).alias('min_tip_amount'),
        F.format_number(F.sum('improvement_surcharge'), 2).alias('sum_improvement_surcharge'),
        F.format_number(F.avg('improvement_surcharge'), 2).alias('avg_improvement_surcharge'),
        F.format_number(F.max('improvement_surcharge'), 2).alias('max_improvement_surcharge'),
        F.format_number(F.min('improvement_surcharge'), 2).alias('min_improvement_surcharge'),
        F.format_number(F.sum('total_amount'), 2).alias('sum_total_amount'),
        F.format_number(F.avg('total_amount'), 2).alias('avg_total_amount'),
        F.format_number(F.max('total_amount'), 2).alias('max_total_amount'),
        F.format_number(F.min('total_amount'), 2).alias('min_total_amount'),
        F.format_number(F.sum('congestion_surcharge'), 2).alias('sum_congestion_surcharge'),
        F.format_number(F.avg('congestion_surcharge'), 2).alias('avg_congestion_surcharge'),
        F.format_number(F.max('congestion_surcharge'), 2).alias('max_congestion_surcharge'),
        F.format_number(F.min('congestion_surcharge'), 2).alias('min_congestion_surcharge'),
        F.format_number(F.sum('Airport_fee'), 2).alias('sum_Airport_fee'),
        F.format_number(F.avg('Airport_fee'), 2).alias('avg_Airport_fee'),
        F.format_number(F.max('Airport_fee'), 2).alias('max_Airport_fee'),
        F.format_number(F.min('Airport_fee'), 2).alias('min_Airport_fee'),
        F.count('Airport_fee').alias('Trip_counts')
    ]
    result_df = grouped_df.agg(*agg_exprs)
    return result_df
    


In [None]:

groupby_cols=['vendor_name',
 'flag_name',
 'payment_name',
 'rate_name',
 'PickupBorough',
 'PickupZone',
 'Pickupservice_zone',
 'pickup_month',
 'pickup_day',
 'pickup_day_of_week',
 'pickup_time_of_day',
 'DropoffBorough',
 'DropoffZone',
 'Dropoffservice_zone',
 'dropoff_month',
 'dropoff_day',
 'dropoff_day_of_week',
 'dropoff_time_of_day']
a=aggregate_Taxi(df_marged,groupby_cols)

In [8]:
df1=[
 'vendor_name',
 'flag_name',
 'payment_name',
 'rate_name'
]
df2=[
 'DropoffZone',
 'dropoff_month'
]

df3=[
 'PickupZone',
 'pickup_month'
]

df4=['PickupZone',
    'DropoffZone']

df5=['PickupZone',
    'pickup_day_of_week']

df6=['DropoffZone',
    'pickup_day_of_week']


df7=['pickup_month',
    'pickup_day_of_week']


insight = {
    'Vendor, flag type, rate, and payment type analysis': df1,
    'Dropoff location and month analysis': df2,
    'Pickup location and month analysis': df3,
    'Pickup and dropoff zone relationship analysis': df4,
    'Pickup location and day of the week analysis': df5,
    'Dropoff location and pickup day of the week analysis': df6,
    'Pickup month and day of the week trends': df7,
}


In [None]:
aggregatation_piovte_table={}
for description,dimention in insight.items() :
    pivote_table=aggregate_Taxi(df_marged,dimention)
    aggregatation_piovte_table[description]=pivote_table

In [17]:
def check_file_exists(index_path):
    return os.path.exists(index_path)

In [12]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
import os
from LLM_integration.data_ingestion import prepare_for_embadding
from utils import check_file_exists


def documents_converter(data_dict):
    print("here we convert to doc")
    documents = []
    
    for filename, df in data_dict.items():
            print(filename)
            text_rows = []
            for row in df.toLocalIterator():  
                text_rows.append(", ".join(map(str, row)))
                if len(text_rows) >= 1000:  
                    documents.append(Document(page_content="\n".join(text_rows), metadata={"source": filename}))
                    text_rows = []  
            
            
            if text_rows:
                documents.append(Document(page_content="\n".join(text_rows), metadata={"source": filename}))

    return documents


def text_spliter(data):           
    print("here we splite to chunks")
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
    text_chunks = text_splitter.split_documents(data)
    return text_chunks


def Embeddings_and_vectordb(data,vdb_path,cheak_path):
    # cheak of its already embadded
    if check_file_exists(cheak_path)==True:
        docsearch = FAISS.load_local(cheak_path,embeddings=OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY")),allow_dangerous_deserialization=True)
    else :
        data_dictionary=prepare_for_embadding(data)
        documents=documents_converter(data_dictionary)
        chunks=text_spliter(documents)
        print("here we embedding")
        embedding = OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY"))
        docsearch = FAISS.from_documents(chunks, embedding)
        docsearch.save_local(vdb_path)
    return docsearch


 