In [None]:
!pip install pyspark
# Ejecutar en Kaggle o Colab

In [None]:
#import findspark
#findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pyspark_ejercicio").getOrCreate()
spark

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### 1. Leer el csv: pga_tour_historical.csv

In [None]:
pga_tour = spark.read.csv(path = "/kaggle/input/pyspark-teoria-practica/pga_tour_historical.csv",
                          inferSchema = True, header = True)

pga_tour.limit(5).toPandas()

### 2. Imprimir Schema

In [None]:
pga_tour.printSchema()

### 3. Modificar los dtypes de las columnas

In [None]:
from pyspark.sql.types import *

data_schema = list((StructField("Player Name", StringType() , True),
                    StructField("Season"     , IntegerType(), True),
                    StructField("Statistic"  , StringType() , True),
                    StructField("Variable"   , StringType() , True),
                    StructField("Value"      , IntegerType(), True),))

final_struc = StructType(fields = data_schema)

pga_tour = spark.read.csv(path = "/kaggle/input/pyspark-teoria-practica/pga_tour_historical.csv",
                          schema = final_struc)

pga_tour.printSchema()

### 4. Genera las estadísticas de la columna "Value"

In [None]:
pga_tour.describe(["Value"]).show()

In [None]:
pga_tour.select("Value")                                 \
        .summary("count", "min", "max", "stddev", "mean")\
        .show()

In [None]:
pga_tour.describe(["Value", "Season"]).show()

### 5. Genera las estadisticas de las columnas "Value" y "Season"

In [None]:
pga_tour.select("Value", "Season")                       \
        .summary("count", "min", "max", "stddev", "mean")\
        .show()

### 6. Lee el csv de fifa

In [None]:
fifa = spark.read.csv(path = "/kaggle/input/pyspark-teoria-practica/fifa19.csv",
                      inferSchema = True, header = True )

fifa.limit(3).toPandas()

### 7. Imprime el Schema de fifa

In [None]:
fifa.printSchema()

### 8. Selecciona las columnas "Name" y "Position" y ordenalas por "Name"

In [None]:
fifa.select(["Name", "Position"])\
    .orderBy(fifa.Name)          \
    .limit(5)                    \
    .toPandas()

### 9. Selecciona a los jugadores cuyo club comience por "FC"

In [None]:
fifa.select(["Name", "Club"])    \
    .where(fifa.Club.like("FC%"))\
    .show(5, truncate = False)

### 10. Cual es el jugador mas joven y cual es el mas viejo

In [None]:
fifa.select(["Name", "Age"]) \
    .orderBy(fifa.Age.desc())\
    .limit(1)                \
    .toPandas()

In [None]:
fifa.select(["Name", "Age"]) \
    .orderBy(fifa.Age.asc()) \
    .limit(1)                \
    .toPandas()

### 11. Selecciona a los siguiente jugadores:

 - L. Messi
 - Cristiano Ronaldo

In [None]:
fifa[fifa.Name.isin(["L. Messi", "Cristiano Ronaldo"])].toPandas()

### 12. En que moneda le pagan a los jugadores? "Release Clause"

In [None]:
fifa.select("Release Clause",
            fifa["Release Clause"].substr(1, 1).alias("Currency"))\
    .show(5, truncate = False)

### 13. Selecciona a los jugadores:
- mayores de 20 años
- overall mayor a 40
- nacionalidad española o inglesa

In [None]:
fifa.filter("Age > 20 AND Overall > 40 AND (Nationality = 'Spain' OR Nationality = 'England')")\
    .limit(5)                                                                                  \
    .toPandas()

### 14. Leer el csv: Rep_vs_Dem_tweets.csv

In [None]:
tweets = spark.read.csv(path = "/kaggle/input/pyspark-teoria-practica/Rep_vs_Dem_tweets.csv",
                        header = True, inferSchema = True)

tweets.limit(5).toPandas()

### 15. Cambiar los valores de la columna "Party" que no sean "Republican" o "Democrat" a "Other"

In [None]:
counts = tweets.groupBy("Party").count()

counts.orderBy(desc("count")).show(6)

In [None]:
from pyspark.sql.functions import when

clean = tweets.withColumn("Party",
                          when(tweets.Party == "Democrat", "Democrat").    \
                          when(tweets.Party == "Republican", "Republican").\
                          otherwise("Other"))

counts = clean.groupBy("Party").count()

counts.orderBy(desc("count")).show()

In [None]:
clean.select("Party",
             expr("CASE WHEN Party = 'Republican' THEN 'Republican'\
                        WHEN Party = 'Democrat' THEN 'Democrat'    \
                        ELSE 'Other' END AS New_Party"))            \
     .show(5)

### 16. Renombra la columna "Party" a "Dem_Rep"
Usa .withColumnRenamed()

In [None]:
renamed = tweets.withColumnRenamed("Party", "Dem_Rep")
renamed.limit(4).toPandas()

### 17. Concatena la columna "Handle" con "Dem_Rep"

In [None]:
tweets.select(tweets.Party,
              tweets.Handle,
              concat_ws(" ", tweets.Party, tweets.Handle).alias("Concatenated"))\
      .show(5, truncate = False)

### 18. Cuales tweets tienen la palabra "vote"

In [None]:
array = tweets.select("Tweet",
                      lower(tweets.Tweet).alias("lower_tweet"))

array.limit(5).toPandas()

In [None]:
array = array.select("Tweet",
                      "lower_tweet",
                      split(array.lower_tweet, " ").alias("new"))

array.limit(5).toPandas()

In [None]:
array.select("Tweet", array_contains(array.new, "vote")).limit(5).toPandas()

In [None]:
tweets.select('Tweet').where(tweets.Tweet.like("%vote%")).show(truncate = False)

### 19. Cargar el csv: nyc_air_bnb.csv 

In [None]:
airbnb = spark.read.csv(path = "/kaggle/input/pyspark-teoria-practica/nyc_air_bnb.csv",
                        inferSchema = True, header = True)
airbnb.limit(3).toPandas()

### 20. Imprimir Schema

In [None]:
print(airbnb.printSchema())

### 21. Cambiar el formato de las columnas que se asignaron mal

In [None]:
df = airbnb.withColumn("latitude"                      , airbnb["latitude"]                      .cast( DoubleType()))\
           .withColumn("longitude"                     , airbnb["longitude"]                     .cast( DoubleType()))\
           .withColumn("price"                         , airbnb["price"]                         .cast( DoubleType()))\
           .withColumn("minimum_nights"                , airbnb["minimum_nights"]                .cast(IntegerType()))\
           .withColumn("number_of_reviews"             , airbnb["number_of_reviews"]             .cast(IntegerType()))\
           .withColumn("reviews_per_month"             , airbnb["reviews_per_month"]             .cast(  FloatType()))\
           .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))\
           .withColumn("availability_365"              , airbnb["availability_365"]              .cast(IntegerType()))\

print(df.printSchema())
df.limit(3).toPandas()

### 22. Cuantas filas tiene el df?

In [None]:
df.count()

### 23. Cuantos reviews hay por host?

In [None]:
df.groupBy("host_id").sum("number_of_reviews").show(20, truncate = False)

### 24. Muestra los maximos y minimos de las columnas numericas

In [None]:
lista = [col for col, type_ in df.dtypes if (type_ == "int") or (type_ == "double")]
lista

In [None]:
df.select(lista)\
  .summary("min", "max")                                                            \
  .toPandas()

### 25. Cuales hosts tienen el mayor numero de reviews?

In [None]:
df.groupBy("host_id")                                  \
  .agg(sum("number_of_reviews").alias("Total Reviews"))\
  .orderBy(sum("number_of_reviews").desc())            \
  .show(5, truncate = False)

### 26. Cual es la media del minimo de noches?

In [None]:
df.agg(avg("minimum_nights")).withColumnRenamed("avg(minimum_nights)", "Avg Nights").show()

### 27. En promedio, cual es el barrio mas caro?

In [None]:
df.groupBy("neighbourhood")      \
  .agg(avg("price").alias("AVG"))\
  .orderBy(avg("price").desc())  \
  .show()

In [None]:
##########################################################################################################################