In [None]:
!pip install pyspark
!pip install -q findspark

In [None]:
import findspark
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
import folium
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 
%matplotlib inline

In [None]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '15G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
    .getOrCreate()
    return spark

spark = init_spark()


Caricamento dei vari file .json

In [None]:
userDS = spark.read \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .json('../input/yelp-dataset/yelp_academic_dataset_user.json')
businessDS = spark.read \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .json('../input/yelp-dataset/yelp_academic_dataset_business.json')
reviewDS = spark.read \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .json('../input/yelp-dataset/yelp_academic_dataset_review.json')
tipDS = spark.read \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .json('../input/yelp-dataset/yelp_academic_dataset_tip.json')

#flat_rdd.show(10)

In [None]:
#calcolo numero totale di utenti sulla piattaforma
TotalUsers=userDS.count()
#calcolo il numero medio di fans degli utenti
avg_fans=userDS.select('fans').where(userDS['fans']!=0).agg({'fans':'avg'}).toPandas().to_numpy().ravel()[0]
#la percentuale di persone che hanno un numero di follower al di sotto della media:
num_less_avg_fans=userDS.select("fans").where(userDS["fans"]<avg_fans).count()
perc_inf_fans=num_less_avg_fans/TotalUsers
#ed al di sopra della media


#calcolo il numero totale di complimenti ricevuti da ogni utente
compliments=userDS.withColumn("COMPLIMENTS",userDS["compliment_cool"]+userDS["compliment_funny"]+userDS["compliment_hot"]+
                             userDS["compliment_more"]+userDS["compliment_profile"]+userDS["compliment_cute"]+
                             userDS["compliment_list"]+userDS["compliment_note"]+userDS["compliment_plain"]+
                             userDS["compliment_writer"]+userDS["compliment_photos"]).select("user_id","COMPLIMENTS")
#ne calcolo il valore medio
avg_compliments=compliments.select('COMPLIMENTS').where(compliments['COMPLIMENTS']!=0).agg({'COMPLIMENTS':'avg'}).toPandas().to_numpy().ravel()[0]
#calcolo la percentuale di utenti che hanno un numero di complimenti al di sotto della media
num_less_avg_comp=compliments.select("user_id").where(compliments["COMPLIMENTS"]<avg_compliments).count()
perc_inf_compl=num_less_avg_comp/TotalUsers
#e al di sopra della media


#calcolo il numero totale di voti inviati dagli utenti
sent_votes=userDS.withColumn("SENT_VOTES",userDS["useful"]+userDS["funny"]+userDS["cool"]).select("user_id","SENT_VOTES")
#ne calcolo il valore medio
avg_votes=sent_votes.select('SENT_VOTES').where(sent_votes['SENT_VOTES']!=0).agg({'SENT_VOTES':'avg'}).toPandas().to_numpy().ravel()[0]
#calcolo la percentuale di persone con un numero di voti minore della media
num_less_avg_votes=sent_votes.select("user_id").where(sent_votes["SENT_VOTES"]<avg_votes).count()
perc_inf_vot=num_less_avg_votes/TotalUsers

#calcolo il numero medio di review fatte dagli utenti
avg_review=userDS.select('review_count').where(userDS['review_count']!=0).agg({'review_count':'avg'}).toPandas().to_numpy().ravel()[0]
#calcolo la percentuale degli utenti che hanno un numero di recensioni al di sotto della media
num_less_avg_rev=userDS.select("user_id").where(userDS["review_count"]<avg_review).count()
perc_inf_rev=num_less_avg_rev/TotalUsers

perc_inf=(perc_inf_rev,perc_inf_vot,perc_inf_compl,perc_inf_fans)
#è possibile notare che la maggior parte degli utenti si trovano al di sotto di queste medie:
#(0.8115345010330012, 0.9123646748297322, 0.9514929596452935, 0.8942560907330712)





**CREARE STATISTICHE**

In [None]:



#Creo dataframe con il numero di anni elite per ogni utente
rdd2=userDS.rdd.map(lambda x:[x['user_id'], len((x['elite']).replace("20,20","2020,").replace(","," ").split())])

schema = StructType([
     StructField('user_id', StringType(), True),
    StructField('EliteYears', IntegerType(), True)
])
df = spark.createDataFrame(rdd2,schema)

#calcolo utenti abbastanza buoni
Complete1=userDS.join(compliments,on=["user_id"],how='left')
Complete2=Complete1.join(df,on=["user_id"],how='left')
Complete=Complete2.join(sent_votes,on=["user_id"],how='left')

#calcolo il numero medio di Eliteyears degli utenti
avg_elite=Complete.select('EliteYears').where(Complete['EliteYears']!=0).agg({'EliteYears':'avg'}).toPandas().to_numpy().ravel()[0]
#calcolo la percentuale degli utenti che hanno un numero di recensioni al di sotto della media



goodUsers=Complete.select("user_id").where((Complete['fans']>=avg_fans) & (Complete['review_count']>=avg_review)
                                        & (Complete['COMPLIMENTS']>=avg_compliments) & (Complete['SENT_VOTES']>=avg_votes)
                                          &( Complete['EliteYears']>=avg_elite))


In [None]:
print('Dataset Completo')
Complete.select('user_id','fans','friends','name','review_count','yelping_since','COMPLIMENTS','EliteYears','SENT_VOTES').limit(10).show()


In [None]:
num_less_avg_elite=Complete.select("user_id").where(Complete["EliteYears"]<avg_elite).count()
perc_inf_elite=num_less_avg_elite/TotalUsers
perc_inf=np.array([perc_inf_rev,perc_inf_vot,perc_inf_compl,perc_inf_fans,perc_inf_elite])

In [None]:
# Stacked barplot with pandas with differnt palette
perc_sup=1-perc_inf
perc_a=[perc_sup[0],perc_inf[0]]
perc_b=[perc_sup[1],perc_inf[1]]
perc_c=[perc_sup[2],perc_inf[2]]
perc_d=[perc_sup[3],perc_inf[3]]
perc_e=[perc_sup[4],perc_inf[4]]


df_plot  = pd.DataFrame([perc_a,perc_b,perc_c,perc_d,perc_e])
df_plot.index=["perc_rev","perc_vot","perc_compl","perc_fans","perc_elite"]

# Bar plot
df_plot.plot(kind='bar',stacked=True, colormap='Spectral', title='User Analysis').legend(["higher","lower"])


In [None]:
from pyspark.context import SparkContext
import re 
sc = SparkContext.getOrCreate()

#Ottengo la lista dei locali recensiti dai GoodUsers
localss=goodUsers.join(reviewDS,on=['user_id'],how='left').select('user_id','business_id','stars')
#Ottengo la lista del locali recensiti dai goodUsers con >3 stelle
localss3stars=localss.select('user_id','business_id').where(localss['stars']>3)


In [None]:
print('Locali recensiti dai goodUsers con almeno 3 stelle')
localss3stars.distinct().limit(10).show()

In [None]:
#estraggo una riga randomicamente dagli users
Users=userDS.select('user_id')
person=Users.rdd.takeSample(False,1,seed=13)
person_id=person[0][0]


In [None]:
person

In [None]:
#creo una tabella con il numero di amici per ogni utente

rdd3=userDS.rdd.map(lambda x:[x['user_id'], len((x['friends']).replace(","," ").split())])

schema = StructType([
     StructField('user_id', StringType(), True),
    StructField('NumFriends', IntegerType(), True)
])
df = spark.createDataFrame(rdd3,schema)


In [None]:
df.limit(10).show()

In [None]:
NumeroAmici=df.select('NumFriends').where(df['user_id']==person_id).toPandas().to_numpy().ravel()[0]
NomiLocali=businessDS.select('name','business_id','stars')
Locali_suggeriti=localss3stars.select('business_id').distinct()
 #devo eliminare i locali recensiti dall'utente in maniera negativa
NotGoodLocals=reviewDS.select('user_id','business_id').where((reviewDS['user_id']==person_id) &(reviewDS['stars']<3))
Locali_suggeriti=Locali_suggeriti.join(NomiLocali,on=['business_id'],how='left').join(NotGoodLocals,on=['business_id'],how='leftanti')
if(NumeroAmici==0):
    print('Poichè l utente non ha amici , gli verranno suggeriti locali in funzione degli utenti migliori')
    print('.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.')    
    Locali_suggeriti.show()
    Mappa=Locali_suggeriti.limit(10)
if(NumeroAmici>0):
    print('Lista dei locali consigliati')
    NomiLocali=businessDS.select('name','business_id','stars')
    frnds=userDS.select('user_id','friends').where(userDS['user_id']==person_id)
    frnds=frnds.join(df,on='user_id',how='left')
    h=frnds.select('friends').toPandas().to_numpy().ravel()[0]
    listaAmici = h.replace(",", "") \
    .split()
    #vedoi locali ben recensiti dagli amici dell utente
    localiBenRecensiti=reviewDS.select('business_id','user_id').filter(reviewDS['user_id'].isin(listaAmici)).filter(reviewDS['stars']>3)
    localss=localiBenRecensiti.join(NomiLocali,on='business_id',how='left').join(NotGoodLocals,on=['business_id'],how='leftanti')
    localss=localss.select('business_id','name','stars').where(localss['stars']>3)
    numLoc=localss.count()
    if(numLoc<10): #se sono meno di 10, il rimanente glielo suggerisco tramite i locali recensiti bene dai goodUsers
        numAdd=10-numLoc
        print(f'il numero di locali recensiti dagli amici è {numLoc} \n')
        print(f'Di conseguenza verranno proposti {numAdd} locali scelti dalla lista dei goodUsers \n')
        #localss3stars=localss3stars.select('business_id','name','stars') #rimuovo la colonna del user_id dai localss che non mi serve
        temp=Locali_suggeriti.limit(numAdd)
        final=temp.union(localss)
        final.show()
        Mappa=final
    else:
        
        localss.limit(10).show()
        Mappa=localss.limit(10)

    

locations=Mappa.select('business_id').join(businessDS,on='business_id',how='left').select('name','latitude','longitude')
names=locations.select('name').toPandas().to_numpy().ravel()
latitudes=locations.select('latitude').toPandas().to_numpy().ravel()
longitudes=locations.select('longitude').toPandas().to_numpy().ravel()
map = folium.Map(location=[latitudes.mean(), longitudes.mean()], zoom_start=14, control_scale=True)
for i in range(len(latitudes)):
    folium.Marker([latitudes[i], longitudes[i]], popup=names[i]).add_to(map)
map


    
        

In [None]:
businessDS.select('business_id','name','stars').where(businessDS['stars']==5).limit(10).show()