### Guardo el path del archivo en una variable

In [0]:
path_insurance = "dbfs:/FileStore/shared_uploads/insurance/insurance.csv"

### El archivo csv lo leo con spark y lo muestro

In [0]:
df_insurance = spark.read.csv(path_insurance, header= True)
df_insurance.show()

### Transformo la columna charges a 2 decimales

In [0]:
df_insurance = df_insurance.withColumn("charges", df_insurance["charges"].cast("decimal(12,2)"))

### Creo una temp view del dataframe para hacer consultas sql

In [0]:
df_insurance.createOrReplaceTempView("insurance")

### La consulta la guardo en una variable

In [0]:
query = '''
        SELECT *
        FROM insurance
        ORDER BY bmi DESC;
        '''

### La consulta la uso en spark.sql

In [0]:
df_insurance_query = spark.sql(query)

### Muestro el dataframe que surge a base de la consulta

In [0]:
df_insurance_query.show()

In [0]:
query = '''
        SELECT *
        FROM insurance
        ORDER BY charges DESC;
        '''

In [0]:
df_insurance_query = spark.sql(query)
df_insurance_query.show(50, truncate=False)

In [0]:
query = '''
        SELECT age, sex, round(avg(bmi)) as avg_bmi, smoker, region
        FROM insurance
        GROUP BY age, region, sex, smoker
        HAVING region = "southeast"
        ORDER BY avg_bmi DESC;
        '''

In [0]:
df_insurance_query = spark.sql(query)
df_insurance_query.show()

### Ordeno por indice de masa corporal

In [0]:
df_insurance.orderBy("bmi", ascending=True).show(10)


### Reemplazo "male"por "hombre"

In [0]:
df_insurance.na.replace("male", "hombre").show()

### Reemplazo "male" por "hombre" y "female" por "mujer"

In [0]:
df_insurance.na.replace(["male", "female"], ["hombre", "mujer"]).show()

### Paso a may√∫sculas la columna smoker

In [0]:
from pyspark.sql.functions import upper, lower, col

#df_insurance.withColumn("smoker", initcap(col("smoker"))).show(false)
df_insurance.select("age", "sex", "bmi", "children", upper(col("smoker")), "region", "charges").show()

### Hago distinct con la cantidad de hijos

In [0]:
df_insurance.select("children").distinct().show()

### Muestro el schema del dataframe

In [0]:
df_insurance.printSchema()

### Creo el schema del csv con StructType

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType
#
'''
|age|   sex|   bmi|children|smoker|   region| charges   |
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|    33|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
'''


data = [(19,"female",27.9, 0,"yes","southeast",16884.92),
    (18,"male",33.77, 1,"no","southeast", 1725.55),
    (28,"male",33.3, 3,"no","southeast", 4449.46),
    (33,"male",22.70, 0,"no","northeast", 21984.47),
    (32,"male",28.88, 0,"no","northeast", 3866.85)
  ]

schema = StructType([ \
    StructField("age",IntegerType(),True), \
    StructField("sex",StringType(),True), \
    StructField("bmi",FloatType(),True), \
    StructField("children", IntegerType(), True), \
    StructField("smoker", StringType(), True), \
    StructField("region", StringType(), True), \
    StructField("charges", FloatType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

### Guardo el dataframe en .json y .parquet

In [0]:
#df.coalesce(1).write.format('json').save('FileStore/shared_uploads/insurance/insurance.json')

In [0]:
#df.coalesce(1).write.format('parquet').save('FileStore/shared_uploads/insurance/insurance.parquet')

### Verifico que se hayan guardado los archivos

In [0]:
%fs
ls dbfs:/FileStore/shared_uploads/insurance/

path,name,size
dbfs:/FileStore/shared_uploads/insurance/insurance.csv,insurance.csv,55628
dbfs:/FileStore/shared_uploads/insurance/insurance.json/,insurance.json/,0
dbfs:/FileStore/shared_uploads/insurance/insurance.parquet/,insurance.parquet/,0


### Guardo el path de los archivos en variables

In [0]:
path_json = "dbfs:/FileStore/shared_uploads/insurance/insurance.json"

In [0]:
path_parquet = "dbfs:/FileStore/shared_uploads/insurance/insurance.parquet"

### Los archivos los guardo en un dataframe

In [0]:
df_insurance_json = spark.read.json(path_json)
df_insurance_json.show()

In [0]:
df_insurance_parquet = spark.read.parquet(path_parquet)
df_insurance_parquet.show()

### Hago substring de la columna region

In [0]:
df.withColumn('new_region', df['region'].substr(0, 5)).show()


### Hago un reverse de la columna region y lo guardo en otra columna

In [0]:
from pyspark.sql.functions import col, concat, reverse

df.withColumn("noiger", concat(reverse(col("region")))).show()

### Hago un case con la columna sex

In [0]:
from pyspark.sql.functions import when

df2=df.select(col("age"),
              col("sex"),when(df.sex == "male","M")
                  .when(df.sex == "female","F")
                  .when(df.sex.isNull() ,"")
                  .otherwise(df.sex).alias("new_sex"))

df2.show()

In [0]:
#df.join(df2, df.age == df2.age, 'outer').select(df.age, df2.new_sex)
df3 = df.join(df2, df.age==df2.age)
df3.show()