In [1]:
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DateType,IntegerType,FloatType

In [2]:
spark = SparkSession.builder.appName("Hoteles").getOrCreate()

**Informacion #1**

In [3]:
informacion = spark.read.option("header","True").option("inferSchema", "True").csv("hoteles.csv")\
                        .distinct().na.drop(subset=["calificacion","nombre","rango_precios"])

In [4]:
informacion =informacion.fillna("Sin informacion",subset=["facilidades","telefono","direccion"])\
                        .withColumn('ubicacion', coalesce('ubicacion', 'calificacion'))\
                        .withColumn('limpieza', coalesce('limpieza', 'calificacion'))\
                        .withColumn('servicio', coalesce('servicio', 'calificacion'))\
                        .withColumn('calidad_precio', coalesce('calidad_precio', 'calificacion')) \
                        .filter(col("opiniones")>100)

In [5]:
informacion = informacion.withColumn("calificacion",(col("calificacion")) * 2)\
                         .withColumn("ubicacion",(col("ubicacion")) * 2)\
                         .withColumn("limpieza",(col("limpieza")) * 2)\
                         .withColumn("servicio",(col("servicio")) * 2)\
                         .withColumn("calidad_precio",(col("calidad_precio")) * 2)

**Informacion #2**

In [6]:
informacion_2 = spark.read.option("header","True").option("inferSchema", "True").csv("opiniones.csv")\
                          .distinct().na.drop(subset=["fecha","autor","hotel"])

In [7]:
informacion_2 = informacion_2.withColumn("fecha",col("fecha").cast(DateType())) \
                             .withColumn("YearDate", split(col("fecha"), "-").getItem(0).cast(IntegerType())) \
                             .withColumn("MonthDate", split(col("fecha"), "-").getItem(1).cast(IntegerType()))\
                             .withColumn("calificacion",(col("calificacion")) * 2)

**Creacion de Tablas**

**Hoteles #1**

In [8]:
informacion = informacion.withColumn("Id_Hotel",monotonically_increasing_id()+1) \
                         .withColumn("nombre",trim(col("nombre")))

**Hoteles #1**

In [9]:
hoteles = informacion.select("Id_Hotel","nombre","calificacion","ubicacion","limpieza","servicio","calidad_precio","direccion","telefono")

In [10]:
detalles = informacion.withColumn('Promedio_Min', split(col("rango_precios"), ",").getItem(0).cast(IntegerType())) \
                      .withColumn('Promedio_Max', split(col("rango_precios"), ",").getItem(1).cast(IntegerType())) \
                      .withColumn("Id_Detalle",monotonically_increasing_id()+100) \
                      .select("Id_Hotel","Id_Detalle","Promedio_Min","Promedio_Max","habitaciones","opiniones","facilidades")

In [11]:
detalles_extend = detalles.select("Id_Detalle",posexplode(split(detalles["facilidades"],",")).alias("pos","Servicio"))\
                          .withColumn("Servicio",trim(col("Servicio")))\
                          .select("Id_Detalle","Servicio")

**Detalles #2**

In [12]:
detalles = detalles.select("Id_Detalle","Id_Hotel","Promedio_Min","Promedio_Max","habitaciones","opiniones")

**Servicios #3**

In [13]:
servicios = detalles_extend.select("Servicio").dropDuplicates()\
                           .withColumn("Id_Servicio",monotonically_increasing_id()+20)\
                           .select("Id_Servicio","Servicio")

**Detalles_Extend #4**

In [14]:
detalles_extend = detalles_extend.join(servicios,servicios["Servicio"]==detalles_extend["Servicio"])\
                                 .select("Id_Detalle",servicios["Id_Servicio"])

**Segunda Informacion**

In [15]:
opiniones = informacion_2.withColumn("hotel",trim(col("hotel")))\
                         .withColumn("autor",trim(col("autor")))\
                         .withColumnRenamed("calificacion","calificacion_opin")

**Autores #5**

In [16]:
autores = opiniones.select("autor").dropDuplicates()\
                   .withColumn("Id_Autor",monotonically_increasing_id()+200)\
                   .select("Id_Autor","autor")

In [17]:
opiniones = opiniones.withColumn("Id_Opinion",monotonically_increasing_id()+1)\
                     .join(hoteles,opiniones["hotel"]==hoteles["nombre"])\
                     .join(autores,opiniones["autor"]==autores["autor"]) \
                     .select("Id_Opinion","Id_Hotel","Id_Autor","fecha","YearDate","MonthDate","calificacion_opin","titulo","comentario")

**Detalle_Opin #6**

In [18]:
detalle_opin = opiniones.select("Id_Opinion","titulo","comentario")

**Opiniones #7**

In [19]:
opiniones = opiniones.select("Id_Opinion","Id_Hotel","Id_Autor","fecha","YearDate","MonthDate","calificacion_opin")

**Envio de tablas a SQL Server**

In [20]:
hoteles.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://YOUR SERVER NAME;databaseName=Hoteles;") \
  .option("dbtable", "Hoteles") \
  .option("user", "YOUR USER") \
  .option("password", "YOUR PASSWORD") \
  .save()

In [21]:
detalles.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://YOUR SERVER NAME;databaseName=Hoteles;") \
  .option("dbtable", "Detalles") \
  .option("user", "YOUR USER") \
  .option("password", "YOUR PASSWORD") \
  .save()

In [22]:
servicios.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://YOUR SERVER NAME;databaseName=Hoteles;") \
  .option("dbtable", "Servicio") \
  .option("user", "YOUR USER") \
  .option("password", "YOUR PASSWORD") \
  .save()

In [23]:
detalles_extend.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://YOUR SERVER NAME;databaseName=Hoteles;") \
  .option("dbtable", "Detalle_Extendido") \
  .option("user", "YOUR USER") \
  .option("password", "YOUR PASSWORD") \
  .save()

In [24]:
autores.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://YOUR SERVER NAME;databaseName=Hoteles;") \
  .option("dbtable", "Autores") \
  .option("user", "YOUR USER") \
  .option("password", "YOUR PASSWORD") \
  .save()

In [25]:
opiniones.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://YOUR SERVER NAME;databaseName=Hoteles;") \
  .option("dbtable", "Opiniones") \
  .option("user", "YOUR USER") \
  .option("password", "YOUR PASSWORD") \
  .save()

In [26]:
detalle_opin.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:sqlserver://YOUR SERVER NAME;databaseName=Hoteles;") \
  .option("dbtable", "Detalle_Opini") \
  .option("user", "YOUR USER") \
  .option("password", "YOUR PASSWORD") \
  .save()