In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, regexp_replace
from pyspark.sql.types import IntegerType

In [2]:
# Initialize a Spark session
spark = SparkSession.builder.appName("DataTransformation")\
    .config('spark.driver.extraClassPath','/usr/lib/jvm/java-11-openjdk-amd64/lib/postgresql-42.5.0.jar')\
        .getOrCreate()

23/09/11 14:51:25 WARN Utils: Your hostname, kushal-Latitude-E5440 resolves to a loopback address: 127.0.1.1; using 192.168.1.14 instead (on interface wlp2s0)
23/09/11 14:51:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/11 14:51:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
listing_df_raw = spark.read.csv('raw_data/listings.tsv', header=True, inferSchema=True, sep="\t")
reviews_df_raw = spark.read.csv('raw_data/reviews.tsv', header=True, inferSchema=True, sep="\t")
calendar_df_raw = spark.read.csv('raw_data/calendar.tsv', header=True, inferSchema=True,sep="\t")

                                                                                

In [4]:
# Transform the data

# LISTINGS DATA

# Drop the 'summary' column
listing_df_raw = listing_df_raw.drop('summary')
listing_df_raw = listing_df_raw.drop('description')
listing_df_raw = listing_df_raw.drop('host_about')


# Convert 'host_is_superhost' to boolean
listing_df_raw = listing_df_raw.withColumn('host_is_superhost', when(col('host_is_superhost') == 't', True).otherwise(False))

# Drop 'country' and 'market' columns
listing_df_raw = listing_df_raw.drop('country', 'market')

# Drop rows with null values in the 'space' column
listing_df_raw = listing_df_raw.na.drop(subset=['space'])

# Drop rows with null values in the 'property_type' column
listing_df_raw = listing_df_raw.na.drop(subset=['property_type'])

# Remove "$" and convert to integer
listing_df_raw = listing_df_raw.withColumn("price", regexp_replace(col("price"), "[^0-9]", "").cast(IntegerType()))

# Replace commas in all columns using a loop
for column in listing_df_raw.columns:
    listing_df_raw = listing_df_raw.withColumn(column, regexp_replace(col(column), ',', ''))

In [5]:
listing_df_raw.show()

                                                                                

+--------+--------------------+--------------------+--------------------+-----------+--------------------+------------------+------------------+--------------------+-----------------+----------+--------------------+----------+-----+-------+-------------+---------------+------------+---------+--------+-----+-----------------+
|      id|         listing_url|                name|               space|  host_name|       host_location|host_response_time|host_response_rate|host_acceptance_rate|host_is_superhost|host_since|              street|      city|state|zipcode|property_type|      room_type|accommodates|bathrooms|bedrooms|price|number_of_reviews|
+--------+--------------------+--------------------+--------------------+-----------+--------------------+------------------+------------------+--------------------+-----------------+----------+--------------------+----------+-----+-------+-------------+---------------+------------+---------+--------+-----+-----------------+
|12147973|https://w

In [6]:
calendar_df_raw.show()

+----------+----------+---------+-----+
|listing_id|      date|available|price|
+----------+----------+---------+-----+
|  12147973|2017-09-05|        f| null|
|  12147973|2017-09-04|        f| null|
|  12147973|2017-09-03|        f| null|
|  12147973|2017-09-02|        f| null|
|  12147973|2017-09-01|        f| null|
|  12147973|2017-08-31|        f| null|
|  12147973|2017-08-30|        f| null|
|  12147973|2017-08-29|        f| null|
|  12147973|2017-08-28|        f| null|
|  12147973|2017-08-27|        f| null|
|  12147973|2017-08-26|        f| null|
|  12147973|2017-08-25|        f| null|
|  12147973|2017-08-24|        f| null|
|  12147973|2017-08-23|        f| null|
|  12147973|2017-08-22|        f| null|
|  12147973|2017-08-21|        f| null|
|  12147973|2017-08-20|        f| null|
|  12147973|2017-08-19|        f| null|
|  12147973|2017-08-18|        f| null|
|  12147973|2017-08-17|        f| null|
+----------+----------+---------+-----+
only showing top 20 rows



In [7]:
# CALENDAR DATA

# Convert 'available' to boolean
calendar_df_raw = calendar_df_raw.withColumn('available', when(col('available') == 't', True).otherwise(False))

# Remove "$" and convert to integer
calendar_df_raw = calendar_df_raw.withColumn("price", regexp_replace(col("price"), "[^0-9]", "").cast(IntegerType()))

In [8]:
#Save the data
listing_df_raw.coalesce(4).write.parquet('cleaned_data/clean_listing_parquet', mode="overwrite", compression="snappy")
calendar_df_raw.coalesce(4).write.parquet('cleaned_data/clean_calendar_parquet', mode="overwrite", compression="snappy")
reviews_df_raw.coalesce(4).write.parquet('cleaned_data/clean_reviews_parquet', mode="overwrite", compression="snappy")

                                                                                

The code below writes the cleaned dataframe into the postgres db

In [13]:
listing_df_raw.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/Final',driver = 'org.postgresql.Driver', dbtable = 'listing', user='postgres',password='kushal2psg').mode('overwrite').save()
calendar_df_raw.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/Final',driver = 'org.postgresql.Driver', dbtable = 'calendar', user='postgres',password='kushal2psg').mode('overwrite').save()
reviews_df_raw.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/Final',driver = 'org.postgresql.Driver', dbtable = 'reviews', user='postgres',password='kushal2psg').mode('overwrite').save()

23/09/11 15:12:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                