In [None]:
display(dbutils.fs.ls("/mnt/s3data"))

In [None]:
# File location and type
file_location = "dbfs:/mnt/s3data/listing.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_listing = spark.read.format(file_type) \
  .option("multiline","true") \
  .option("quote", "\"") \
  .option("escape", "\"") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df_listing)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


df_listing=df_listing.withColumn("scrape_id",col("scrape_id").cast(DoubleType()))

In [None]:
from pyspark.sql.types import *

# File location and type
file_location = "dbfs:/mnt/s3data/neighbourhoods.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

neighbourhoodSchema = StructType([
    StructField("neighbourhood_group", StringType(), True),        
    StructField("neighbourhood", StringType(), True)
])

# The applied options are for CSV files. For other file types, these will be ignored.
df_neighbourhoods = spark.read.format(file_type) \
  .option("infer_schema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df_neighbourhoods)


In [None]:
# File location and type
file_location = "dbfs:/mnt/s3data/reviews_fixed/"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_reviews = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df_reviews)

In [None]:
%sql

select * from listing

In [None]:
%sql

select * from neighbourhood

In [None]:
%sql

select * from reviews

In [None]:
display(df_listing.groupBy('property_type').count())

In [None]:
from pyspark.sql.functions import *

df_listing.groupBy('property_type').count().orderBy(col('count').desc()).show()

In [None]:
df_listing.groupBy("property_type","price").agg(({"price": "average"})).show()

In [None]:
df_listing.printSchema()

In [None]:
%sql
select distinct price from listing

In [None]:
from pyspark.sql.types import FloatType

def trim_char(string):
    return string.strip('$')

spark.udf.register("trim_func", trim_char)

trim_func_udf = udf(trim_char)


In [None]:
df_listing.select("property_type","price",trim_func_udf("price").cast(FloatType()).alias("price_f")).groupBy("property_type").agg(({"price_f":"average"})).show()

In [None]:
df_listing.select("property_type","price",trim_func_udf("price").cast(FloatType()).alias("price_f")).groupBy("property_type").agg(({"price_f":"average"})).show()

In [None]:
display(df_listing.select("property_type","price",trim_func_udf("price").cast(FloatType()).alias("price_f")).groupBy("property_type").agg(({"price_f":"average"})))

In [None]:
import pyspark.sql.functions as f
from pyspark.sql.window import Window
df_neigh_count=df_listing.groupBy("neighbourhood").agg(({"neighbourhood":"count"})).withColumnRenamed("count(neighbourhood)", "count")
df_neigh_count = df_neigh_count.withColumn('percentage', round((f.col('count')/f.sum('count').over(Window.partitionBy())*100),3))
display(df_neigh_count)

In [None]:
display(df_neigh_count.orderBy('percentage', ascending=False))

In [None]:
%sql
select * from neighbourhood

In [None]:
nei_group=df_neigh_count.alias("nc").join(df_neighbourhoods.alias("ne"),col("nc.neighbourhood") == col("ne.neighbourhood")).select("nc.*","ne.neighbourhood_group")

display(nei_group)

In [None]:
display(nei_group.sort(desc('percentage')))

In [None]:
from pyspark.sql.functions import isnan, when, count, col
display(df_listing.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_listing.columns]))

In [None]:
%sql
select * from reviews

In [None]:
df_reviews.groupBy("listing_id").count().explain()

In [None]:
df_listing.alias("list").join(df_reviews.alias("rev"), col("list.id")==col("rev.listing_id")).select("list.id").groupBy("list.id").count().sort(desc("count")).show()

In [None]:
df_listing.count()

In [None]:
df_listing.alias("list").join(df_reviews.alias("rev"), col("list.id")==col("rev.listing_id"), how="left").select("list.id", "rev.listing_id").filter(col("rev.listing_id").isNull()).distinct().count()

In [None]:
df_reviews.groupBy("reviewer_id").count().filter(col("count") > 30 ).orderBy('count', ascending=False).show()

In [None]:
df_reviews.groupBy("reviewer_id").count().filter(col("count") > 30).orderBy('count', ascending=False).show()

In [None]:
display(df_reviews.describe())

In [None]:
df_review_fil=df_reviews.filter(f.col("reviewer_id").cast("int").isNotNull().alias("Value "))
display(df_review_fil)

In [None]:
df_review_fil.printSchema()

In [None]:
df_review_fil.groupBy("reviewer_id").count().filter(col("count") > 30).orderBy('count', ascending=False).show()

In [None]:
df_review_cnt=df_review_fil.groupBy("reviewer_id").count().filter(col("count") > 30)
display(df_review_cnt)

In [None]:
from datetime import datetime

dfunc =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

spark.udf.register("datec",dfunc)

In [None]:
df_review_fil.withColumn("datetime", dfunc(col("date"))).show()

In [None]:
df_review_dt=df_review_fil.withColumn("datetime", dfunc(col("date"))).groupBy("reviewer_id").agg(f.min('datetime'), f.max('datetime'))
display(df_review_dt)

In [None]:
df_review_summary=df_review_cnt.join(df_review_dt, "reviewer_id").withColumn("date_diff",datediff("max(datetime)","min(datetime)")).orderBy('count', ascending=False)

In [None]:
display(df_review_summary)

In [None]:
df_review_summary.createOrReplaceTempView("review_summary")

In [None]:
%sql

select * from review_summary

In [None]:
%sql

describe review_summary

In [None]:
%sql

select reviewer_id, date_diff/count as avg_days from review_summary order by avg_days asc

In [None]:
%sql

select distinct listing_id, neighbourhood, price, count(listing_id) over (partition by listing_id order by listing_id) as review_cnt from (
select listing_id, neighbourhood, price, reviewer_id 
from listing a left join reviews b on a.id = b.listing_id
)

In [None]:
%sql

select
id, neighbourhood, price, review_cnt,
dense_rank() OVER (PARTITION BY neighbourhood ORDER BY review_cnt DESC) as rank
from
(
select distinct id, neighbourhood, price, count(id) over (partition by id order by id) as review_cnt from (
select a.id, neighbourhood, price, reviewer_id 
from listing a left join reviews b on a.id = b.listing_id
) 
)

In [None]:
%sql
select
id, neighbourhood, price, review_cnt, rank
from (
select
id, neighbourhood, price, review_cnt,
dense_rank() OVER (PARTITION BY neighbourhood ORDER BY review_cnt DESC) as rank
from
(
select distinct id, neighbourhood, price, count(id) over (partition by id order by id) as review_cnt from (
select a.id, neighbourhood, price, reviewer_id 
from listing a left join reviews b on a.id = b.listing_id
) 
)) where rank <= 2

In [None]:
%sql

select room_type, avg(trim_func(price)) as avg_price, count(*) as count from listing group by room_type

In [None]:
%sql
SELECT
regexp_extract(comments, '(good|excellent|amazing)', 1),
comments
FROM
reviews
where regexp_extract(comments, '(good|excellent|amazing)', 1) != ''

In [None]:
%sql
SELECT
regexp_extract(comments, '(bad|worse|wrong)', 1),
comments
FROM
reviews
where regexp_extract(comments, '(bad|worse|wrong)', 1) != ''

In [None]:
%sql

select amenities, split, exploded
from (
select amenities, split(amenities, ",") as split from listing
) LATERAL VIEW explode(split) as exploded limit 10

In [None]:
%sql

select price, CAST(trim_func(price) as float), ntile(5) over(order by CAST(trim_func(price) as float)) from listing where neighbourhood='Corona' order by CAST(trim_func(price) as float)

In [None]:
%sql

select reviewer_id, cur_date, prev_date from (
select reviewer_id, datec(date) as cur_date, lag(datec(date)) over (partition by reviewer_id order by datec(date)) as prev_date from reviews 
where date is not null and date like '%2017%' order by datec(date)
) where prev_date is not null

In [None]:
%sql

select reviewer_id, total_spent, CUME_DIST() OVER ( ORDER BY total_spent ) as cum_dist from (
select reviewer_id, sum(CAST(trim_func(l.price) as float)) as total_spent from reviews as r join listing as l on r.reviewer_id = l.id group by reviewer_id order by 2 desc limit 20) where total_spent is not null