In [None]:
import findspark
findspark.init()
findspark.find()

In [None]:
import os
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk1.8.0_202"  # Replace with your Java 8 path
os.environ["PATH"] = os.environ["JAVA_HOME"] + "\\bin;" + os.environ["PATH"]
os.environ["PYSPARK_SUBMIT_ARGS"] ="--master local[3] pyspark-shell"
os.environ["HADOOP_HOME"] ="C:\\Hadoop\\hadoop-3.0.0"

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, to_date, lit, concat

In [None]:
import pandas as pd

In [None]:
# Step 1: Initialize SparkSession
spark = SparkSession.builder \
    .appName("Partition CSV by Date") \
    .getOrCreate()

## Reading the listings dataframe

In [None]:
pd.set_option('display.max_colwidth', None)

In [None]:
df_reviews_detailed = pd.read_csv('raw_data/listings/listings.csv')

In [None]:
columns_to_keep = ['id','scrape_id','last_scraped','source',
 'name',
 'host_id',
 'host_name',
 'host_since',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_acceptance_rate',
 'host_is_superhost',
 'host_neighbourhood',
 'host_listings_count',
 'host_total_listings_count',
 'neighbourhood',
 'neighbourhood_cleansed',
 'latitude',
 'longitude',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bathrooms_text',
 'bedrooms',
 'beds',
 'amenities',
 'price',
 'minimum_nights',
 'maximum_nights',
 'minimum_minimum_nights',
 'maximum_minimum_nights',
 'minimum_maximum_nights',
 'maximum_maximum_nights',
 'minimum_nights_avg_ntm',
 'maximum_nights_avg_ntm',
 'calendar_updated',
 'has_availability',
 'availability_30',
 'availability_60',
 'availability_90',
 'availability_365',
 'calendar_last_scraped',
 'number_of_reviews',
 'number_of_reviews_ltm',
 'number_of_reviews_l30d',
 'first_review',
 'last_review',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'license',
 'instant_bookable',
 'calculated_host_listings_count',
 'calculated_host_listings_count_entire_homes',
 'calculated_host_listings_count_private_rooms',
 'calculated_host_listings_count_shared_rooms',
 'reviews_per_month']

In [None]:
df_reviews_detailed = df_reviews_detailed[columns_to_keep]

In [None]:
df_reviews_detailed.columns

In [None]:
# convert host_response_time to string
df_reviews_detailed["host_response_time"] = df_reviews_detailed["host_response_time"].astype(str)
df_reviews_detailed["host_response_rate"] = pd.to_numeric(df_reviews_detailed["host_response_rate"].str.rstrip('%'),errors="coerce") / 100
df_reviews_detailed["host_acceptance_rate"] = pd.to_numeric(df_reviews_detailed["host_acceptance_rate"].str.rstrip('%'),errors="coerce") / 100
df_reviews_detailed["host_neighbourhood"] = df_reviews_detailed["host_neighbourhood"].astype(str)
df_reviews_detailed["price"] = pd.to_numeric(df_reviews_detailed["price"].str.lstrip('$'),errors="coerce")
df_reviews_detailed["first_review"] = pd.to_datetime(df_reviews_detailed["first_review"],format='%Y-%m-%d')
df_reviews_detailed["last_review"] = pd.to_datetime(df_reviews_detailed["last_review"],format='%Y-%m-%d')
df_reviews_detailed["neighbourhood"] = df_reviews_detailed["neighbourhood"].astype(str)
df_reviews_detailed["has_availability"] = df_reviews_detailed["has_availability"].astype(str)
df_reviews_detailed["bathrooms_text"] = df_reviews_detailed["bathrooms_text"].astype(str)
df_reviews_detailed["host_location"] = df_reviews_detailed["host_location"].astype(str)
df_reviews_detailed["host_name"] = df_reviews_detailed["host_name"].astype(str)
df_reviews_detailed["host_since"] = df_reviews_detailed["host_since"].astype(str)
df_reviews_detailed["property_type"] = df_reviews_detailed["property_type"].astype(str)
df_reviews_detailed["room_type"] = df_reviews_detailed["room_type"].astype(str)
df_reviews_detailed["host_is_superhost"] = df_reviews_detailed["host_is_superhost"].astype(str)

In [None]:
print(df_reviews_detailed.info())

In [None]:
spark_df = spark.createDataFrame(df_reviews_detailed)

In [None]:
# Step 2: Define input and output
input_file_path = 'raw_data/listings/'
output_file_path = 'output_file_data/listings'

In [None]:
spark_df.printSchema()

In [None]:
# Step 3 Read files and format data

spark_df = spark_df.withColumn("formatted_last_scraped", to_date(col("last_scraped"), "yyyy-MM-dd")) \
       .withColumn("day", date_format(col("last_scraped"), "dd")) \
       .withColumn("month_year", date_format(col("last_scraped"), "MM-yyyy")) \
       .withColumn("file_name", concat(lit("listings_"), date_format(col("formatted_last_scraped"), "ddMMyyyy"), lit(".csv")))

In [None]:
spark_df.select("month_year").show(10, truncate=False)


In [None]:
# Step 4: Partition the data
# Write to `output_data/mmyyyy/listings_ddmmyyyy.csv`
spark_df.write \
    .partitionBy("month_year") \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_file_path)

In [None]:
output_partition_path = f"{output_file_path}month_year=*"

In [None]:
print(output_partition_path)

In [None]:
for root, dirs, files in os.walk(output_partition_path):
    print(files)

In [None]:
null_values_df = spark_df.filter(col("month_year").isNull())

In [None]:
null_values_df.select(['id','scrape_id','month_year','last_scraped']).show(5)

In [None]:
## Testing
import random
import string

In [None]:
spark_writer = SparkSession.builder \
    .appName("WriteCSVTest") \
    .getOrCreate()

In [None]:
# Generate random data
data = []
for i in range(100):  # 100 rows of data
    random_str = ''.join(random.choices(string.ascii_letters, k=5))  # Random string of length 5
    random_int = random.randint(1, 100)
    random_float = random.uniform(1.0, 100.0)
    data.append((random_str, random_int, random_float))

# Define the schema for the DataFrame
columns = ["name", "age", "score"]

# Create a PySpark DataFrame
df = spark_writer.createDataFrame(data, columns)

# Show the DataFrame schema and a few rows to confirm
df.printSchema()
df.show(5)


In [None]:
# Define the output path (change this to your desired directory)
output_path = "output_file_data/test_output.csv"

# Write the DataFrame to CSV
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)

# Stop the Spark session
spark_writer.stop()

## Reviews Dataframe

In [None]:
# Step 1: Initialize SparkSession
spark_reviews = SparkSession.builder \
    .appName("Partition CSV by Date") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "300") \
    .getOrCreate()

In [None]:
#df_reviews = spark_reviews.read.options(delimiter=",", header=True).csv('raw_data/reviews/reviews_detailed.csv')

In [None]:
df_reviews.show(5)

In [None]:
df_reviews.count()

In [None]:
reviews_output_file_path = 'output_file_data/reviews'

In [None]:
df_pd_reviews_detailed = pd.read_csv('C:/Users/zhiyu/Dropbox/Yuan/Learning/dbt_sqlite/raw_data/reviews/reviews_detailed.csv')

In [None]:
df_pd_reviews_detailed.head()

In [None]:
df_reviews = spark_reviews.createDataFrame(df_pd_reviews_detailed)

In [None]:
df_reviews.select('listing_id').distinct().collect()

In [None]:
# Step 3 Read files and format data

df_reviews = df_reviews.withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
       .withColumn("day", date_format(col("date"), "dd")) \
       .withColumn("month", date_format(col("date"), "MM")) \
       .withColumn("year", date_format(col("date"), "yyyy")) \
       .withColumn("file_name", concat(lit("reviews_detailed_"), date_format(col("date"), "ddMMyyyy"), lit(".csv")))

In [None]:
df_reviews = df_reviews.repartition(200)

In [None]:
df_reviews.show(5)

In [None]:
# Step 4: Partition the data
# Write to `output_data/mmyyyy/listings_ddmmyyyy.csv`
df_reviews.write \
    .partitionBy("year","month",'day') \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(reviews_output_file_path)