In [1]:
from itertools import groupby

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

from sparkpayload import listings_reviews

spark = SparkSession.builder \
    .appName("Scannning Through the data ") \
    .getOrCreate()

26/01/31 11:02:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
listings = spark.read.csv("../data/listings.csv.gz",
            header=True,
            inferSchema=True,
            sep = ",",
            quote = '"' ,
            escape = '"',
            multiLine= True,
            mode = "PERMISSIVE"
)

                                                                                

In [3]:
for list in listings.schema:
    print(list)

StructField('id', LongType(), True)
StructField('listing_url', StringType(), True)
StructField('scrape_id', LongType(), True)
StructField('last_scraped', DateType(), True)
StructField('source', StringType(), True)
StructField('name', StringType(), True)
StructField('description', StringType(), True)
StructField('neighborhood_overview', StringType(), True)
StructField('picture_url', StringType(), True)
StructField('host_id', IntegerType(), True)
StructField('host_url', StringType(), True)
StructField('host_name', StringType(), True)
StructField('host_since', DateType(), True)
StructField('host_location', StringType(), True)
StructField('host_about', StringType(), True)
StructField('host_response_time', StringType(), True)
StructField('host_response_rate', StringType(), True)
StructField('host_acceptance_rate', StringType(), True)
StructField('host_is_superhost', StringType(), True)
StructField('host_thumbnail_url', StringType(), True)
StructField('host_picture_url', StringType(), True)


In [7]:
listings \
    .groupby(listings.property_type) \
    .agg(
    F.count('property_type') .alias('count'),
    F.avg('review_scores_location') .alias('avg_review_scores_location'),
) \
    .orderBy('count', ascending = False) \
    .show(truncate = False)



                                                                                

In [11]:
reviews = spark.read.csv("../data/reviews.csv.gz",
            header=True,
            inferSchema=True,
            sep = ",",
            quote = '"' ,
            escape = '"',
            multiLine= True,
            mode = "PERMISSIVE"
)

                                                                                

In [12]:
for x in reviews.schema:
    print(x)

StructField('listing_id', LongType(), True)
StructField('id', LongType(), True)
StructField('date', DateType(), True)
StructField('reviewer_id', IntegerType(), True)
StructField('reviewer_name', StringType(), True)
StructField('comments', StringType(), True)


In [14]:
reviews_per_listings = reviews \
    .groupby('listing_id') \
    .count() \
    .show(10)

                                                                                

### Computing the total number of listings and average review score per host

In [18]:
# Computing the total number of listings and average review score per host
from pyspark.sql.functions import avg, count

host_stats = listings \
    .filter(listings.review_scores_rating.isNotNull()) \
    .groupby('host_id') \
    .agg(
    count('id') .alias('total_listings'),
    avg('review_scores_location') .alias('avg_review_scores_location'),
    ) \
    .show(10)

                                                                                

### Finding the top 10 listings with the highest numbers of reviews

In [20]:
reviews \
    .groupBy('listing_id') \
    .count() \
    .orderBy('count', ascending = False) \
    .limit(10) \
    .show()

                                                                                

### TOP 5 Neighboorhods with the most listings

In [21]:
listings \
    .groupBy('neighbourhood_cleansed') \
    .count() \
    .orderBy('count', ascending = False) \
    .limit(5) \
    .show()

                                                                                

In [22]:
listings_reviews = listings.join(
    reviews, listings.id == reviews.listing_id,how='inner'
)

In [25]:
reviews_per_listings = listings_reviews \
    .groupBy(listings.id, listings.name) \
    .agg(
    F.count(reviews.id).alias('num_reviews')
)\
    .orderBy('num_reviews',ascending=False) \
    .show(truncate = False)


                                                                                

In [27]:
### Using Spark SQL on these datasets
listings.createOrReplaceTempView("listings")
reviews.createOrReplaceTempView("reviews")

26/02/03 09:39:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [28]:
query = """
select
    listings.id,
    listings.name,
    COUNT(reviews.id) AS num_reviews
FROM
    listings
INNER JOIN
reviews
ON
    listings.id = reviews.listing_id
GROUP BY
listings.id,
listings.name
ORDER BY
    num_reviews DESC
"""

reviews_per_listings = spark.sql(query)
reviews_per_listings.show(truncate = False)

                                                                                

### Deriving String Matching for the Pricing

In [34]:
from pyspark.sql.functions import regexp_replace, udf, col
from pyspark.sql.types import StringType


listings = listings.withColumn('price_numeric', regexp_replace(col('price'), '[$,]', '').cast('float'))

def price_string_matching(pricing):

    if pricing is None:
        return "Range not found"
    if pricing < 50:
        return "Budget"
    elif pricing >= 50 and pricing < 150:
        return "Mid range"
    elif pricing >= 150:
        return "High end Luxury"
    return "Range not found"

price_udf = udf(price_string_matching, StringType())

listings.withColumn("price_ranging", price_udf(col("price_numeric")))\
    .select('id','name','property_type','price_ranging') \
    .show(20,truncate = False)

+-----+-------------------------------------------------+---------------------------+---------------+
|id   |name                                             |property_type              |price_ranging  |
+-----+-------------------------------------------------+---------------------------+---------------+
|13913|Holiday London DB Room Let-on going              |Private room in rental unit|Mid range      |
|15400|Bright Chelsea  Apartment. Chelsea!              |Entire rental unit         |Mid range      |
|17402|Very Central Modern 3-Bed/2 Bath By Oxford St W1 |Entire rental unit         |High end Luxury|
|24328|Battersea live/work artist house                 |Entire townhouse           |Range not found|
|36274|Bright 1 bedroom apt off brick lane in Shoreditch|Entire condo               |High end Luxury|
|36299|Kew Gardens 3BR house in cul-de-sac              |Entire townhouse           |High end Luxury|
|36660|You are GUARANTEED to love this                  |Private room in home     

#### Deducing sentiment scores from the comment Col

In [39]:
from pyspark.sql.types import FloatType

postive_words = {'good','great','excellent','amazing','fanastic','wonderful','pleasant','lovely','nice','enjoyed'}
negative_words = {'bad','terrible','awful','horrible','dissappointing','poor','hate','unpleasant','dirty','noisy'}

# Implementing a User defined Function
def sentiment_score(comment):
    if comment is None:
        return 0.0
    comment_lower = comment.lower()
    score = 0

    for word in postive_words:
        if word in comment_lower:
            score += 1
    for word in negative_words:
        if word in comment_lower:
            score -= 1
    return float(score)

sentiment_score_udf = udf(sentiment_score, FloatType())

reviews_with_sentiment = reviews \
    .withColumn('sentiment_score', sentiment_score_udf(col('comments')))

listings \
    .join(reviews_with_sentiment,listings.id == reviews_with_sentiment.listing_id,how='inner') \
    .groupby('listing_id','name') \
    .agg(
        avg('sentiment_score') .alias('avg_sentimental_score'),
)\
    .orderBy('avg_sentimental_score', ascending = False) \
    .select(
        'listing_id','name','avg_sentimental_score')\
    .show(20,truncate = False)


                                                                                