In [113]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("how to read csv file") \
    .getOrCreate()

**Cargo los datos en un dataframe de spark**

In [114]:
df = spark.read.csv('/user/ort/obligatorio/monthly_pays.csv', header=True)

**Vista previa de las primeras filas**

In [115]:
df.show()

+-------+----+----------------+--------------+-------------+---------------------+-----------------------------+
|user_id|plan|mensual_cost_usd|local_currency|      country|has_monthly_purchases|purchase_value_local_currency|
+-------+----+----------------+--------------+-------------+---------------------+-----------------------------+
|      1|   C|         $38.90 |         Zloty|       Poland|                FALSE|                       $0.00 |
|      2|   D|         $62.02 |          Peso|     Colombia|                FALSE|                       $0.00 |
|      3|   C|         $99.53 |        Rupiah|    Indonesia|                 TRUE|                     $768.87 |
|      4|   C|        $128.35 |         Naira|      Nigeria|                FALSE|                       $0.00 |
|      5|   D|         $46.69 | Yuan Renminbi|        China|                 TRUE|                   $1,558.45 |
|      6|   A|         $43.57 |        Dollar|United States|                FALSE|              

**Cantidad de columnas del dataframe monthly_pays**

In [116]:
num_columns=len(df.columns)
num_columns

7

**Nombre de las columnas de monthly_pays**

In [117]:
df.columns

['user_id',
 'plan',
 'mensual_cost_usd',
 'local_currency',
 'country',
 'has_monthly_purchases',
 'purchase_value_local_currency']

**Descripción de los datos de la tabla**

In [118]:
df.describe

<bound method DataFrame.describe of DataFrame[user_id: string, plan: string, mensual_cost_usd: string, local_currency: string, country: string, has_monthly_purchases: string, purchase_value_local_currency: string]>

**Schema de la tabla**

In [119]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- plan: string (nullable = true)
 |-- mensual_cost_usd: string (nullable = true)
 |-- local_currency: string (nullable = true)
 |-- country: string (nullable = true)
 |-- has_monthly_purchases: string (nullable = true)
 |-- purchase_value_local_currency: string (nullable = true)



In [120]:
from pyspark.sql.functions import col, regexp_replace

df = df.withColumn("mensual_cost_usd", regexp_replace(col("mensual_cost_usd"), "\\$", ""))

df = df.withColumn("mensual_cost_usd", col("mensual_cost_usd").cast("int"))

In [121]:
df = df.withColumn("purchase_value_local_currency", regexp_replace(col("purchase_value_local_currency"), "\\$", ""))

df = df.withColumn("purchase_value_local_currency", regexp_replace(col("purchase_value_local_currency"), ",", "").cast("float"))

In [122]:
df = df.withColumn("has_monthly_purchases", col("has_monthly_purchases").cast("boolean"))

In [123]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- plan: string (nullable = true)
 |-- mensual_cost_usd: integer (nullable = true)
 |-- local_currency: string (nullable = true)
 |-- country: string (nullable = true)
 |-- has_monthly_purchases: boolean (nullable = true)
 |-- purchase_value_local_currency: float (nullable = true)



**Valores Nulos o Faltantes**

In [124]:
total_nulos = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

total_nulos.show()

+-------+----+----------------+--------------+-------+---------------------+-----------------------------+
|user_id|plan|mensual_cost_usd|local_currency|country|has_monthly_purchases|purchase_value_local_currency|
+-------+----+----------------+--------------+-------+---------------------+-----------------------------+
|      0|   0|               0|            15|      0|                    0|                            0|
+-------+----+----------------+--------------+-------+---------------------+-----------------------------+



In [125]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window().partitionBy("country")

monthly_pays_refined = df.withColumn(
    "local_currency",
    F.when(F.col("local_currency").isNull(), F.first("local_currency", True).over(windowSpec)).otherwise(F.col("local_currency"))
)

monthly_pays_refined.show()

+-------+----+----------------+--------------+-------------+---------------------+-----------------------------+
|user_id|plan|mensual_cost_usd|local_currency|      country|has_monthly_purchases|purchase_value_local_currency|
+-------+----+----------------+--------------+-------------+---------------------+-----------------------------+
|    783|   B|             113|       Afghani|  Afghanistan|                 true|                       1209.0|
|    829|   C|             140|       Afghani|  Afghanistan|                false|                          0.0|
|    835|   D|             100|       Afghani|  Afghanistan|                 true|                      2785.16|
|    412|   C|              36|          Euro|Aland Islands|                 true|                      2320.89|
|     67|   B|             149|           Lek|      Albania|                 true|                      1535.58|
|    104|   D|              48|           Lek|      Albania|                 true|              

In [126]:
total_nulos = monthly_pays_refined.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

total_nulos.show()

+-------+----+----------------+--------------+-------+---------------------+-----------------------------+
|user_id|plan|mensual_cost_usd|local_currency|country|has_monthly_purchases|purchase_value_local_currency|
+-------+----+----------------+--------------+-------+---------------------+-----------------------------+
|      0|   0|               0|             0|      0|                    0|                            0|
+-------+----+----------------+--------------+-------+---------------------+-----------------------------+



In [127]:
hdfs_path = "/user/ort/obligatorio/refined/refined_monthly_pays/"
monthly_pays_refined.write.csv(hdfs_path, header=False, mode="overwrite")