In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Initialisation de Spark

In [None]:
spark = SparkSession.builder.appName("Projet InfoD").config("spark.jars", "/home1/tc737978/Documents/M2/Info décisionnelle/postgresql-42.7.5.jar").getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)


## Chargement des données

In [None]:
tip = spark.read.csv("yelp_academic_dataset_tip.csv", header=True, inferSchema=True)
business = spark.read.json("yelp_academic_dataset_business.json")
checkin = spark.read.json("yelp_academic_dataset_checkin.json")
reviews = spark.read.jdbc("jdbc:postgresql://stendhal:5432/tpid2020", "yelp.review", properties={"user": "tpid", "password": "tpid", "driver": "org.postgresql.Driver"})

In [None]:
tip.show()

In [None]:
business.show()

In [None]:
checkin.show()

In [None]:
reviews.show()

## Création des dataframes

### Business

In [None]:
Business = business.select("business_id")
Business = Business.withColumn("ID_Categorie", monotonically_increasing_id())
Business = Business.withColumn("ID_Attributes", monotonically_increasing_id())
Business = Business.withColumn("ID_Hours", monotonically_increasing_id())
Business = Business.withColumn("ID_Localisation", monotonically_increasing_id())
Business = Business.withColumn("ID_Reviews", monotonically_increasing_id())
Business.show()


### Localisation

In [21]:
Localisation = business.select("address", "city", "state", "postal_code", "latitude", "longitude")
Localisation = Localisation.withColumn("id", monotonically_increasing_id())
Localisation = Localisation.withColumn("country", \
                        when(Localisation["state"] == "NC", "USA") \
                        .when(Localisation["state"] == "AZ", "USA") \
                        .when(Localisation["state"] == "QC", "CAN") \
                        .when(Localisation["state"] == "NV", "USA") \
                        .when(Localisation["state"] == "IL", "USA") \
                        .when(Localisation["state"] == "ON", "CAN") \
                        .when(Localisation["state"] == "AB", "CAN") \
                        .when(Localisation["state"] == "PA", "USA") \
                        .when(Localisation["state"] == "WI", "USA") \
                        .when(Localisation["state"] == "SC", "USA") \
                        .when(Localisation["state"] == "OH", "USA") \
                        .when(Localisation["state"] == "CA", "USA") \
                        .when(Localisation["state"] == "TX", "USA") \
                        .when(Localisation["state"] == "NY", "USA") \
                        .when(Localisation["state"] == "CO", "USA") \
                        .when(Localisation["state"] == "XWY", "USA") \
                        .when(Localisation["state"] == "GA", "USA") \
                        .when(Localisation["state"] == "BC", "CAN") \
                        .when(Localisation["state"] == "YT", "CAN") \
                        .when(Localisation["state"] == "HPL", "USA") \
                        .when(Localisation["state"] == "AL", "USA") \
                        .when(Localisation["state"] == "UT", "USA") \
                        .when(Localisation["state"] == "VT", "USA") \
                        .when(Localisation["state"] == "WA", "USA") \
                        .when(Localisation["state"] == "NE", "USA") \
                        .when(Localisation["state"] == "DOW", "USA") \
                        .when(Localisation["state"] == "MI", "USA") \
                        .when(Localisation["state"] == "FL", "USA") \
                        .when(Localisation["state"] == "AR", "USA") \
                        .when(Localisation["state"] == "HI", "USA") \
                        .when(Localisation["state"] == "MB", "CAN") \
                        .when(Localisation["state"] == "OR", "USA") \
                        .when(Localisation["state"] == "AK", "USA") \
                        .when(Localisation["state"] == "VA", "USA") \
                        .when(Localisation["state"] == "CT", "USA") \
                        .when(Localisation["state"] == "MO", "USA") \
                        .when(Localisation["state"] == "DUR", "USA") \
                        .otherwise("Unknown"))
Localisation.show()

### Reviews

In [10]:
Reviews = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=business.schema)

### Year

### Categories

In [None]:
Categories = Business.select("ID_Categorie")


### Categories_List

In [None]:
Categories_List = Categories.select("ID_Categorie")
Categories_List = Categories_List.withColumn("Name", lit(""))
for i in Business.select("business_id").collect():
    cat = business.select("categories").where(business["business_id"] == i.business_id).collect()
    list_cat = ()
    for j in cat:
        list_cat = j.categories.split(", ")
    for k in list_cat:
        Categories_List = Categories_List.union(Categories_List.select("ID_Categorie", "Name").withColumn("Name", lit(k)))
Categories_List.show()

### Attributes

In [12]:
Attributes = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=business.schema)
Attributes = Attributes.withColumn("id", monotonically_increasing_id())

### Attributes_List

### Hours

In [None]:
Hours = business.select("hours.*")
Hours = Hours.withColumn("id", monotonically_increasing_id())
Hours.show()