# CURSO: MINERIA DE DATOS
### ESTUDIANTE: ROSMEL URIEL DEZA CONDORI
### CODIGO: 171058
### TAREA: 5 Ejercicios de Pre-Procesamiento

## IMPORTAR LIBRERIAS

In [None]:
# Instalar el paquete findspark  para acceder a Spark desde cualquier entorno de trabajo Python.
import findspark 
findspark.init()
# Importar librerias BIGDATA
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [None]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

## 1. ALGORITMO DE ESCALONAMIENTO

In [None]:
# Librerías necesarias para el escalonamiento 
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

# Data de entrada
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

# Ingresamos parámetros para el MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Calcule estadísticas resumidas y genere MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# Cambiar la escala de cada característica al rango [mínimo, máximo]
scaledData = scalerModel.transform(dataFrame)
print("Funciones escaladas al rango: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()

Funciones escaladas al rango: [0.000000, 1.000000]
+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]|     (3,[],[])|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+



## 2. ALGORITMO DE NORMALIZACIÓN

In [None]:
# Importamos la librerías a utilizar
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

#Crea dataframe
dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normaliza cada vector usando la Normalizer.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalizando....")
l1NormData.show()

Normalizando....
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+



## 3. ALGORITMO DE BAG OF WORDS

In [None]:
# Importamos la librerías a utilizar
from pyspark.sql.functions import explode, split, regexp_replace, col, lower
#Data
data = [
    ['a', 'Jan', 'John', 'This is a document'],
    ['b', 'Feb', 'Mary', 'A book by Mary'],
    ['c', 'Mar', 'Luke', 'Newspaper article'],
    ['d', 'Apr', 'Mark', None]
]
columns = ['Title', 'Month', 'Author', 'Document']
#Creando un data frame df
df = spark.createDataFrame(data, columns)
#Mostrando el df
df.show()
#BOG
df.select(explode(split(regexp_replace("Document", "[,.-]", " "), "\s+")).alias("word"))\
    .groupby(lower(col("word")).alias("lower"))\
    .count()\
    .show()

+-----+-----+------+------------------+
|Title|Month|Author|          Document|
+-----+-----+------+------------------+
|    a|  Jan|  John|This is a document|
|    b|  Feb|  Mary|    A book by Mary|
|    c|  Mar|  Luke| Newspaper article|
|    d|  Apr|  Mark|              null|
+-----+-----+------+------------------+

+---------+-----+
|    lower|count|
+---------+-----+
| document|    1|
|       is|    1|
|        a|    2|
|     this|    1|
|       by|    1|
|     mary|    1|
|     book|    1|
|newspaper|    1|
|  article|    1|
+---------+-----+



## 4. ALGORITMO DE N-GRAMS

In [None]:
# Importamos la librerías a utilizar
from pyspark.ml.feature import NGram
from pyspark.sql import SparkSession

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



#### LECTURA DE DATOS PARA EL TF-IDF

In [None]:
spark = SparkSession.builder.master("local[4]").getOrCreate()

In [None]:
# Leer el dataset tripavidsor_hotel.csv y mostrar 33 datos del dataset
df = spark.read.csv("tripadvisor_hotel_reviews.csv", header=True)
df.show(n=12, truncate=100, vertical=False)

+----------------------------------------------------------------------------------------------------+------+
|                                                                                              Review|Rating|
+----------------------------------------------------------------------------------------------------+------+
|nice hotel expensive parking got good deal stay hotel anniversary, arrived late evening took advi...|     4|
|ok nothing special charge diamond member hilton decided chain shot 20th anniversary seattle, star...|     2|
|nice rooms not 4* experience hotel monaco seattle good hotel n't 4* level.positives large bathroo...|     3|
|unique, great stay, wonderful time hotel monaco, location excellent short stroll main downtown sh...|     5|
|great stay great stay, went seahawk game awesome, downfall view building did n't complain, room h...|     5|
|love monaco staff husband stayed hotel crazy weekend attending memorial service best friend husba...|     5|
|cozy stay

## 5. ALGORITMO DE **IDF**

La frecuencia inversa de documento es una medida de si el término es común o no, en la colección de documentos.

In [None]:
idf_table = (
    tf_table
    .groupby('Token')
    .agg(
        {'Document_ID': 'count'}
    )
    .withColumnRenamed(
        'count(Document_ID)',
        'Token_in_all_documents_count'
    )
    .orderBy(
        F.col('Token_in_all_documents_count').desc()
    )
    .limit(100)
    .withColumn(
        'Documents_count',
        F.lit(df.count())
    )
)

idf_table.show(n=33, truncate=True, vertical=False)

+---------+----------------------------+---------------+
|    Token|Token_in_all_documents_count|Documents_count|
+---------+----------------------------+---------------+
|    hotel|                       16325|          20491|
|     room|                       14056|          20491|
|      not|                       12124|          20491|
|    staff|                       11528|          20491|
|    great|                       11021|          20491|
|     stay|                       10096|          20491|
|     good|                        9280|          20491|
|   stayed|                        8552|          20491|
|       nt|                        8383|          20491|
|    rooms|                        8341|          20491|
| location|                        8172|          20491|
|     just|                        7736|          20491|
|    clean|                        7651|          20491|
|     nice|                        7420|          20491|
|      did|                    

## 6. ALGORITMO DE TF-IDF

El tf-idf se calcula como:
Un peso alto en tf-idf se alcanza con una elevada frecuencia de término (en el documento dado) y una pequeña frecuencia de ocurrencia del término en la colección completa de documentos. Como el cociente dentro de la función logaritmo del idf es siempre mayor o igual que 1, el valor del idf (y del tf-idf) es mayor o igual que 0. Cuando un término aparece en muchos documentos, el cociente dentro del logaritmo se acerca a 1, ofreciendo un valor de idf y de tf-idf cercano a 0.

In [None]:
tf_idf_table = (
    tf_table
    .join(
        (
            tf_table
            .groupBy('Document_ID')
            .agg(
                {"Token_in_document_count": "sum"}
            )
            .withColumnRenamed(
                'sum(Token_in_document_count)',
                'Tokens_count'
            )
        ),
        on='Document_ID',
        how='left'
    )
    .join(
        idf_table,
        on='Token',
        how='left'
    )
)

tf_idf_table.show(n=33, truncate=True, vertical=False)

+----------+-----------+-----------------------+------------+----------------------------+---------------+
|     Token|Document_ID|Token_in_document_count|Tokens_count|Token_in_all_documents_count|Documents_count|
+----------+-----------+-----------------------+------------+----------------------------+---------------+
|      room|          0|                      3|          86|                       14056|          20491|
|    better|          1|                      2|         243|                        3244|          20491|
|attractive|          6|                      1|          98|                        null|           null|
|  positive|          6|                      1|          98|                        null|           null|
| concierge|          7|                      2|          85|                        null|           null|
|        nt|         10|                      2|          44|                        8383|          20491|
|     clean|         12|             

In [None]:
tf_idf_table = (tf_idf_table.na
    .drop(
        subset=['Token_in_all_documents_count']
    )
    .withColumn(
        'TF',
        F.col('Token_in_document_count') / F.col('Tokens_count')
    )
    .withColumn(
        'IDF',
        F.log2(F.col('Documents_count') / F.col('Token_in_all_documents_count'))
    )
    .withColumn(
        'TF-IDF',
        F.col('TF') * F.col('IDF')
    )
)

(
    tf_idf_table
    .select(['Token', 'Document_ID', 'TF-IDF'])
    .show(n=33, truncate=True, vertical=False)
)

+-----------+-----------+--------------------+
|      Token|Document_ID|              TF-IDF|
+-----------+-----------+--------------------+
|       room|          0|0.018969917298355492|
|     better|          1|   0.021885964343023|
|         nt|         10| 0.05861144808162943|
|      clean|         12|0.016919882906726882|
|       stay|         15|0.009772311740784861|
|       desk|         16| 0.06791032495372712|
|        bed|         19|0.016141640469173452|
|  excellent|         30| 0.05977208382864487|
|     really|         32|0.025233545234842725|
|     street|         70| 0.09023604479773865|
|        bed|         80|0.024871303171889705|
|        day|        116|0.025712667815178728|
|       just|        125| 0.02066662782967244|
|     little|        133|0.027726300229127476|
|     hotels|        146| 0.01870759696543231|
|        did|        153|0.007976294401516069|
|    walking|        173| 0.05654406021356884|
|         no|        176|  0.0078983576505205|
|   bathroom|

In [None]:
tf_idf_table.count()

444239

In [None]:
(
    tf_idf_table
    .limit(3)
    .groupBy('Token')
    .pivot('Document_ID')
    .agg(
        F.first(F.col('TF-IDF'))
    )
    .show()
)

+------+--------------------+-----------------+-------------------+
| Token|                   0|                1|                 10|
+------+--------------------+-----------------+-------------------+
|  room|0.018969917298355492|             null|               null|
|better|                null|0.021885964343023|               null|
|    nt|                null|             null|0.05861144808162943|
+------+--------------------+-----------------+-------------------+

