In [1]:
#import libraries
import pandas as pd
import numpy as np
import psycopg2
import pyspark
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
from pyspark.sql.functions import isnan, when, count, col


# Extract data from CSV files

In [2]:
spark = SparkSession.builder.appName("Proj").getOrCreate()

24/12/02 11:05:15 WARN Utils: Your hostname, MacBook-Pro-Anna-3.local resolves to a loopback address: 127.0.0.1; using 10.10.227.36 instead (on interface en0)
24/12/02 11:05:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/02 11:05:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## contacts.csv

In [3]:
df_con = spark.read.option('header', 'true').csv("data/contacts.csv", inferSchema=True)
df_con.show(5)

                                                                                

+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+----------------+-----------------+--------+--------------+------------------------------------+---------------------+----------------------+
|       id_guest_anon|        id_host_anon|     id_listing_anon|ts_interaction_first|  ts_reply_at_first|ts_accepted_at_first|      ts_booking_at|ds_checkin_first|ds_checkout_first|m_guests|m_interactions|m_first_message_length_in_characters|contact_channel_first|guest_user_stage_first|
+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+----------------+-----------------+--------+--------------+------------------------------------+---------------------+----------------------+
|da8656a1-51af-4f3...|5426897d-960d-401...|a408a8b2-0d44-451...| 2016-04-21 02:55:53|2016-04-21 03:15:00| 2016-04-21 03:15:00|2016-04-21

In [4]:
df_con.printSchema()

root
 |-- id_guest_anon: string (nullable = true)
 |-- id_host_anon: string (nullable = true)
 |-- id_listing_anon: string (nullable = true)
 |-- ts_interaction_first: timestamp (nullable = true)
 |-- ts_reply_at_first: timestamp (nullable = true)
 |-- ts_accepted_at_first: timestamp (nullable = true)
 |-- ts_booking_at: timestamp (nullable = true)
 |-- ds_checkin_first: date (nullable = true)
 |-- ds_checkout_first: date (nullable = true)
 |-- m_guests: double (nullable = true)
 |-- m_interactions: integer (nullable = true)
 |-- m_first_message_length_in_characters: double (nullable = true)
 |-- contact_channel_first: string (nullable = true)
 |-- guest_user_stage_first: string (nullable = true)



In [5]:
count_dup_rows = df_con.count() - df_con.distinct().count()
print(f"Number of duplicate rows: {count_dup_rows}")


[Stage 6:>                                                          (0 + 2) / 2]

Number of duplicate rows: 0


                                                                                

In [6]:
Dict_Null = {col:df_con.filter(df_con[col].isNull()).count() for col in df_con.columns}
print(f"Number of Null values in each column: ")
for key, value in Dict_Null.items():
    print(key, ": ", value)

Number of Null values in each column: 
id_guest_anon :  0
id_host_anon :  0
id_listing_anon :  0
ts_interaction_first :  0
ts_reply_at_first :  2032
ts_accepted_at_first :  11472
ts_booking_at :  16300
ds_checkin_first :  0
ds_checkout_first :  0
m_guests :  1
m_interactions :  0
m_first_message_length_in_characters :  0
contact_channel_first :  0
guest_user_stage_first :  0


## listings.csv

In [7]:
df_lis = spark.read.option('header', 'true').csv("data/listings.csv", inferSchema=True)
df_lis.show(5)

+--------------------+---------------+--------------------+-------------+
|     id_listing_anon|      room_type|listing_neighborhood|total_reviews|
+--------------------+---------------+--------------------+-------------+
|71582793-e5f8-46d...|   Private room|           -unknown-|          0.0|
|a1a3f728-e21f-443...|Entire home/apt|          Copacabana|          0.0|
|353a68be-ecf9-4b7...|Entire home/apt|     Barra da Tijuca|          3.0|
|b9ae1908-0486-40a...|Entire home/apt|                Lapa|          4.0|
|fa0290ef-7881-448...|Entire home/apt|           -unknown-|          0.0|
+--------------------+---------------+--------------------+-------------+
only showing top 5 rows



In [8]:
df_lis.printSchema()

root
 |-- id_listing_anon: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- listing_neighborhood: string (nullable = true)
 |-- total_reviews: double (nullable = true)



In [9]:
count_dup_rows = df_lis.count() - df_lis.distinct().count()
print(f"Number of duplicate rows: {count_dup_rows}")

Number of duplicate rows: 0


In [10]:
Dict_Null = {col:df_lis.filter(df_lis[col].isNull()).count() for col in df_lis.columns}
print(f"Number of Null values in each column: ")
for key, value in Dict_Null.items():
    print(key, ": ", value)

Number of Null values in each column: 
id_listing_anon :  0
room_type :  0
listing_neighborhood :  0
total_reviews :  0


## users.csv

In [11]:
df_user = spark.read.option('header', 'true').csv("data/users.csv", inferSchema=True)
df_user.show(5)

+--------------------+-------+---------------------+
|        id_user_anon|country|words_in_user_profile|
+--------------------+-------+---------------------+
|1d16a001-31a2-494...|     FR|                    0|
|42607e0a-86c0-472...|     AR|                    0|
|25f85eb5-a700-44e...|     BR|                    0|
|55abeba0-18ef-4c5...|     BR|                    1|
|5d62d35a-7d6d-45d...|     BR|                   98|
+--------------------+-------+---------------------+
only showing top 5 rows



In [12]:
df_user.printSchema()

root
 |-- id_user_anon: string (nullable = true)
 |-- country: string (nullable = true)
 |-- words_in_user_profile: integer (nullable = true)



In [13]:
count_dup_rows = df_user.count() - df_user.distinct().count()
print(f"Number of duplicate rows: {count_dup_rows}")

Number of duplicate rows: 68


In [14]:
df_user = df_user.dropDuplicates()

In [15]:
count_dup_rows = df_user.count() - df_user.distinct().count()
print(f"Number of duplicate rows: {count_dup_rows}")

Number of duplicate rows: 0


In [16]:
Dict_Null = {col:df_user.filter(df_user[col].isNull()).count() for col in df_user.columns}
print(f"Number of Null values in each column: ")
for key, value in Dict_Null.items():
    print(key, ": ", value)

Number of Null values in each column: 
id_user_anon :  0
country :  0
words_in_user_profile :  0


## Explore Listings

In [23]:
df_lis.orderBy("total_reviews").tail(5)

[Row(id_listing_anon='dc3a2aaf-fae4-4b2f-9694-6dc978e8760e', room_type='Entire home/apt', listing_neighborhood='Santa Teresa', total_reviews=182.0),
 Row(id_listing_anon='ead9b36a-1b42-4b2d-a367-baf27dc0251d', room_type='Entire home/apt', listing_neighborhood='Copacabana', total_reviews=182.0),
 Row(id_listing_anon='c0a6e46f-ea7e-43b4-aa79-87081df96949', room_type='Entire home/apt', listing_neighborhood='Copacabana', total_reviews=185.0),
 Row(id_listing_anon='9bbe37a8-2688-4a23-bacf-405535769d88', room_type='Entire home/apt', listing_neighborhood='Ipanema', total_reviews=209.0),
 Row(id_listing_anon='77b09acc-9bc4-4843-9911-e76f4a73d142', room_type='Private room', listing_neighborhood='Santa Teresa', total_reviews=268.0)]

In [20]:
df_lis.groupBy("room_type").count().show()

+---------------+-----+
|      room_type|count|
+---------------+-----+
|    Shared room|  372|
|Entire home/apt| 9647|
|   Private room| 3019|
+---------------+-----+



In [22]:
df_lis.groupBy("listing_neighborhood").count().sort("count", ascending=False).show()

+--------------------+-----+
|listing_neighborhood|count|
+--------------------+-----+
|           -unknown-| 6221|
|          Copacabana| 2531|
|             Ipanema| 1041|
|     Barra da Tijuca|  593|
|              Leblon|  458|
|            Botafogo|  345|
|        Santa Teresa|  243|
|            Flamengo|  171|
|                Lapa|  171|
|                Leme|  168|
|Recreio dos Bande...|  164|
|         Laranjeiras|  120|
|              Tijuca|   82|
|               Lagoa|   68|
|              Glória|   68|
|              Catete|   59|
|               Gávea|   54|
|     Jardim Botânico|   52|
|              Centro|   51|
|             Humaitá|   43|
+--------------------+-----+
only showing top 20 rows



## Explore users

In [18]:
df_user.groupBy("country").count().sort("count", ascending=False).show()

+-------+-----+
|country|count|
+-------+-----+
|     BR|19568|
|     US| 2878|
|     AR| 1770|
|     FR| 1164|
|     GB|  975|
|     DE|  610|
|     CL|  410|
|     CA|  387|
|     AU|  367|
|     NL|  271|
|     ES|  243|
|     CO|  241|
|     IT|  228|
|     CH|  227|
|     UY|  206|
|     MX|  173|
|     PE|  119|
|     PT|  110|
|     BE|  100|
|     DK|   77|
+-------+-----+
only showing top 20 rows



## Join all tables

In [19]:
df_joined = df_con.join(df_user, df_user.id_user_anon == df_con.id_guest_anon)
df_joined = df_joined.join(df_lis, df_lis.id_listing_anon == df_joined.id_listing_anon) 
df_joined.show(1)

+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+----------------+-----------------+--------+--------------+------------------------------------+---------------------+----------------------+--------------------+-------+---------------------+--------------------+---------------+--------------------+-------------+
|       id_guest_anon|        id_host_anon|     id_listing_anon|ts_interaction_first|  ts_reply_at_first|ts_accepted_at_first|      ts_booking_at|ds_checkin_first|ds_checkout_first|m_guests|m_interactions|m_first_message_length_in_characters|contact_channel_first|guest_user_stage_first|        id_user_anon|country|words_in_user_profile|     id_listing_anon|      room_type|listing_neighborhood|total_reviews|
+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+----------------+---------------

24/12/02 11:05:33 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
