In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("Spark APP").config('spark.ui.port', '4050').getOrCreate()
spark

In [11]:
from pyspark.sql import functions as f

In [13]:
df = spark.read.csv('new_york_listings_2024.csv', header=True, inferSchema=True, sep=',')

df.count()

20758

# Data Modeling

## Handling Duplicates

- Rows with different data but same ids
- Rows with different ids but same data
- Rows with same data and same id

In [14]:
print(f'Count of rows: {df.count()}')
print(f'Count of distincts: {df.distinct().count()}')

Count of rows: 20758
Count of distincts: 20758


In this case the number of rows is the same as the number of distinct rows, which means it has no rows with exactly the same data in it. If there were duplicates we must drop it

In [6]:
df = df.dropDuplicates()

Now lets check for rows that has the same data but with different ids

In [15]:
print(f'Count of ids: {df.count()}')
print(f'Count of distinct ids: {
    df.select(
        [c for c in df.columns if c != 'id']
    ).distinct().count()
}')

Count of ids: 20758
Count of distinct ids: 20758


In this case if we had less distinct ids than the total, it would mean that there are rows with the same data but with different ids

It was not our case, but if it was we can drop duplicates in a subset of columns, excluding the id

```
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
```

Now we must check if there are any duplicated ids

In [16]:
df.agg(
    f.count('id').alias('count'),
    f.countDistinct('id').alias('count')
).show()

+-----+-----+
|count|count|
+-----+-----+
|20758|20758|
+-----+-----+



For the cases in which we have a problem with our id data we might just give each row an unique id

```
df.withColumn('new_id', f.monotonically_increasing_id()).show()
```

## Handling Missing Data

- If our dataset can afford we might just drop the missing data. But if we observe a signifficant reduction in our dataset size we must look into each feature to find out those in which has holes in it. If a feature has most of its data missing, its fairly useless

- If a feature is discrete boolean, we can turn it into a categorical variable by adding a 'Missing' category
- If a feature is categorical, we can extend the number of levels and add 'Missing' category as well
- If a feature is numerical we can impute either mean, median, mode or some other predefined value

In [17]:
# Lets Check the percentage of missing observation in each column

df.agg(
    *[
        (1 - (f.count(c) / f.count('*'))).alias(c + '_missing')
        for c in df.columns
    ]
).show()

+----------+------------+---------------+-----------------+---------------------------+---------------------+----------------+-----------------+-----------------+-------------+----------------------+-------------------------+-------------------+-------------------------+--------------------------------------+------------------------+-----------------------------+---------------+--------------+----------------+------------+-------------+
|id_missing|name_missing|host_id_missing|host_name_missing|neighbourhood_group_missing|neighbourhood_missing|latitude_missing|longitude_missing|room_type_missing|price_missing|minimum_nights_missing|number_of_reviews_missing|last_review_missing|reviews_per_month_missing|calculated_host_listings_count_missing|availability_365_missing|number_of_reviews_ltm_missing|license_missing|rating_missing|bedrooms_missing|beds_missing|baths_missing|
+----------+------------+---------------+-----------------+---------------------------+---------------------+---------

For the cases in which we decide to drop the rows with missing obersvations we can set a parameter in dropna function to define a threshold of columns with missing values that would qualify the drop of that row

```
df.dropna(threshold=7).show()
```

The other case is to impute the data with predefined values

```
means = df.agg(
    *[
        f.mean(c).alias(c) for c in df.columns
    ]
).toPandas().to_dict('Records')[0]

df.fillna(means).show()
```

## Handling Outliers

In [20]:
bounds = {}

# Get list of numerical columns
numerical_cols = [col[0] for col in df.dtypes if col[1] in ('int', 'double', 'float')]

for col in numerical_cols:
    quantiles = df.approxQuantile(
        col, [0.25, 0.75], 0.05
    )

    iqr = quantiles[1] - quantiles[0]

    bounds[col] = [
        quantiles[0] - 1.5*iqr,
        quantiles[1] + 1.5*iqr
    ]

outliers = df.select(
    *['id'] + [
        (
            (df[c] < bounds[c][0]) |
            (df[c] > bounds[c][1])
        ).alias(c + '_o') for c in numerical_cols
])

outliers.show()

+------------------+---------+----------+-----------+-------+----------------+-------------------+-------------------+--------------------------------+------------------+-----------------------+------+
|                id|host_id_o|latitude_o|longitude_o|price_o|minimum_nights_o|number_of_reviews_o|reviews_per_month_o|calculated_host_listings_count_o|availability_365_o|number_of_reviews_ltm_o|beds_o|
+------------------+---------+----------+-----------+-------+----------------+-------------------+-------------------+--------------------------------+------------------+-----------------------+------+
|           1312228|    false|     false|      false|  false|           false|              false|              false|                           false|             false|                  false| false|
|          45277537|    false|     false|      false|  false|           false|              false|              false|                            true|             false|                  fals

# Descriptive Statistics

In [21]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: timestamp (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- beds: integer (nullable = true)
 |-- baths: string (nullable = true)



In [22]:
df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('host_id', 'int'),
 ('host_name', 'string'),
 ('neighbourhood_group', 'string'),
 ('neighbourhood', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('room_type', 'string'),
 ('price', 'double'),
 ('minimum_nights', 'int'),
 ('number_of_reviews', 'int'),
 ('last_review', 'timestamp'),
 ('reviews_per_month', 'double'),
 ('calculated_host_listings_count', 'int'),
 ('availability_365', 'int'),
 ('number_of_reviews_ltm', 'int'),
 ('license', 'string'),
 ('rating', 'string'),
 ('bedrooms', 'string'),
 ('beds', 'int'),
 ('baths', 'string')]

At first we may want to check the balance in our categorical variables

In [23]:
categorical_columns = [col[0] for col in df.dtypes if col[1] in ('string')]
categorical_columns

['name',
 'host_name',
 'neighbourhood_group',
 'neighbourhood',
 'room_type',
 'license',
 'rating',
 'bedrooms',
 'baths']

We can check the balance in data with the following code snippet

In [37]:
df.groupBy('neighbourhood_group').count().show()

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|             Queens| 3761|
|           Brooklyn| 7719|
|      Staten Island|  291|
|          Manhattan| 8038|
|              Bronx|  949|
+-------------------+-----+



For the numerical features we may use the describe() method

In [38]:
desc = df.describe(numerical_cols)
desc.show()

+-------+--------------------+-------------------+-------------------+------------------+------------------+-----------------+------------------+------------------------------+------------------+---------------------+------------------+
|summary|             host_id|           latitude|          longitude|             price|    minimum_nights|number_of_reviews| reviews_per_month|calculated_host_listings_count|  availability_365|number_of_reviews_ltm|              beds|
+-------+--------------------+-------------------+-------------------+------------------+------------------+-----------------+------------------+------------------------------+------------------+---------------------+------------------+
|  count|               20758|              20758|              20758|             20758|             20758|            20758|             20758|                         20758|             20758|                20758|             20758|
|   mean|1.7493156215092975E8|  40.72679825847375| -

We can see that there are some variables that is skewed with the maximum value being a number of times greater than the mean. Furthermore there are features with a high standard deviation. We can check the skewness of a variable with the following code

In [39]:
df.agg({'price': 'skewness'}).show()

+-----------------+
|  skewness(price)|
+-----------------+
|89.99196126141626|
+-----------------+



# Correlations

In [41]:
# to calculate a correlation in pyspark is pretty easy. The problem is that it only supports pairwise correlations

df.corr('price', 'minimum_nights')

-0.0065265311510967865

In [42]:
# We can calculate a correlation matrix with the following code

n_numerical = len(numerical_cols)

corr = []

for i in range(0, n_numerical):
    temp = [None] * i

    for j in range(i, n_numerical):
        temp.append(df.corr(numerical_cols[i], numerical_cols[j]))

    corr.append(temp)

corr

[[1.0,
  0.012012421307592386,
  0.12240074989355525,
  -0.005987037347502429,
  -0.07250585448588709,
  -0.13995698152596134,
  0.1703660417597071,
  -0.049946729216101465,
  0.08272219880705102,
  0.10096630436451472,
  0.05659476453973668],
 [None,
  1.0,
  0.04628383949502985,
  -0.0011432424932324747,
  0.004126278257119762,
  -0.04837187179187074,
  -0.04194295174130962,
  0.07007686975253231,
  -0.0051209742774677835,
  -0.041816869752631734,
  -0.0700415572341627],
 [None,
  None,
  1.0,
  -0.03346040983685071,
  0.024145069426878704,
  0.005666195559932275,
  0.04257958096430123,
  -0.07220288369568262,
  0.06189594261392794,
  0.03368315810842712,
  0.0357136527652842],
 [None,
  None,
  None,
  1.0,
  -0.0065265311510967865,
  -0.012588027423888525,
  -0.009916846601852993,
  -0.0073326741599544,
  0.020151097567992438,
  -0.011262846660258334,
  0.06688227776077393],
 [None,
  None,
  None,
  None,
  1.0,
  -0.05919739795705836,
  -0.12253530983811105,
  0.01498057251420073