# Demostracion Clase 4
# Spark SQL

## Inicializacion del ambiente

In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("DemoSparkSql")\
                 .getOrCreate()

# Cargo el dataset
df = spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("datos/titanic.csv")
#                 .schema(schema)\

# df = spark.read.csv('datos/titanic_long.csv', header=True, inferSchema=True)
df.createOrReplaceTempView("titanic")
df.printSchema()



root
 |-- _c0: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Fare: double (nullable = true)



In [2]:
spark.sql("SELECT MAX(Age) FROM titanic where Sex='male' GROUP BY Sex").show()

+--------+
|max(Age)|
+--------+
|    80.0|
+--------+



In [3]:
df.where("Age is not null").select(F.avg('Age')).take(1)[0]["avg(Age)"]
# df.na.fill({'Age': 100}).show()

29.69911764705882

In [4]:
df.select(df.Age).where(df.Sex == 'male').agg(F.max(df.Age)).show()

+--------+
|max(Age)|
+--------+
|    80.0|
+--------+



In [5]:
df.withColumn("Clase", F.when(df.Pclass == 1, "Pasajero clase 1.a").otherwise('Otro')).show()

+---+--------+------+------+----+-------+------------------+
|_c0|Survived|Pclass|   Sex| Age|   Fare|             Clase|
+---+--------+------+------+----+-------+------------------+
|  0|       0|     3|  male|22.0|   7.25|              Otro|
|  1|       1|     1|female|38.0|71.2833|Pasajero clase 1.a|
|  2|       1|     3|female|26.0|  7.925|              Otro|
|  3|       1|     1|female|35.0|   53.1|Pasajero clase 1.a|
|  4|       0|     3|  male|35.0|   8.05|              Otro|
|  5|       0|     3|  male|null| 8.4583|              Otro|
|  6|       0|     1|  male|54.0|51.8625|Pasajero clase 1.a|
|  7|       0|     3|  male| 2.0| 21.075|              Otro|
|  8|       1|     3|female|27.0|11.1333|              Otro|
|  9|       1|     2|female|14.0|30.0708|              Otro|
| 10|       1|     3|female| 4.0|   16.7|              Otro|
| 11|       1|     1|female|58.0|  26.55|Pasajero clase 1.a|
| 12|       0|     3|  male|20.0|   8.05|              Otro|
| 13|       0|     3|  m

In [6]:
df.groupBy(df.Fare, df.Pclass).agg( F.count("*").alias("cuenta") )\
    .orderBy("cuenta", ascending=False).limit(10)

DataFrame[Fare: double, Pclass: int, cuenta: bigint]

In [7]:
df.createOrReplaceTempView("titanic")
spark.sql("""
SELECT Sex, max(Age) as max
FROM titanic where Sex='male'
GROUP BY Sex
""").show()


+----+----+
| Sex| max|
+----+----+
|male|80.0|
+----+----+



In [8]:
q1 = spark.sql("""
    SELECT 
        Fare,
        Pclass,
        COUNT(*) as cuenta
    FROM titanic
    GROUP BY
        Fare, Pclass
    ORDER BY
        cuenta DESC
    LIMIT 10
    """)

q2 = df.groupBy(df.Fare, df.Pclass).agg(F.count("*").alias("cuenta"))\
    .orderBy("cuenta", ascending=False).limit(10)

In [9]:
q1.explain()
q2.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[cuenta#139L DESC NULLS LAST], output=[Fare#21,Pclass#18,cuenta#139L])
+- *(2) HashAggregate(keys=[Fare#21, Pclass#18], functions=[count(1)])
   +- Exchange hashpartitioning(Fare#21, Pclass#18, 200), true, [id=#176]
      +- *(1) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(Fare#21)) AS Fare#21, Pclass#18], functions=[partial_count(1)])
         +- FileScan csv [Pclass#18,Fare#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/DiploUTN/datos/titanic.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Pclass:int,Fare:double>


== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[cuenta#152L DESC NULLS LAST], output=[Fare#21,Pclass#18,cuenta#152L])
+- *(2) HashAggregate(keys=[Fare#21, Pclass#18], functions=[count(1)])
   +- Exchange hashpartitioning(Fare#21, Pclass#18, 200), true, [id=#200]
      +- *(1) HashAggregate(keys=[knownfloat

In [10]:
print(df.count())
print(df.dropDuplicates().count())

891
891


In [11]:
df.select(df["Survived"], df["Pclass"]).limit(3).show()#take(3)
# df.select( "Sex", "Age").take(1)
# df.select(F.when(df.Sex.startswith('fem'), 'Mujer').otherwise('Hombre').alias('Sexo')).show()


+--------+------+
|Survived|Pclass|
+--------+------+
|       0|     3|
|       1|     1|
|       1|     3|
+--------+------+



In [12]:
# df.selectExpr("avg(Age)", "count(distinct(Age))", "count(*) as cantPasajeros").show(2)

df.withColumn("control", F.expr("Survived == Pclass"))\
    .show(2)

+---+--------+------+------+----+-------+-------+
|_c0|Survived|Pclass|   Sex| Age|   Fare|control|
+---+--------+------+------+----+-------+-------+
|  0|       0|     3|  male|22.0|   7.25|  false|
|  1|       1|     1|female|38.0|71.2833|   true|
+---+--------+------+------+----+-------+-------+
only showing top 2 rows



In [13]:
df.where("Age < 30").show(2)

+---+--------+------+------+----+-----+
|_c0|Survived|Pclass|   Sex| Age| Fare|
+---+--------+------+------+----+-----+
|  0|       0|     3|  male|22.0| 7.25|
|  2|       1|     3|female|26.0|7.925|
+---+--------+------+------+----+-----+
only showing top 2 rows



In [14]:
df.explain()

== Physical Plan ==
FileScan csv [_c0#16,Survived#17,Pclass#18,Sex#19,Age#20,Fare#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/DiploUTN/datos/titanic.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:int,Survived:int,Pclass:int,Sex:string,Age:double,Fare:double>




In [15]:
df.cube("Survived", "Pclass").count().orderBy("Survived", "Pclass").show()

+--------+------+-----+
|Survived|Pclass|count|
+--------+------+-----+
|    null|  null|  891|
|    null|     1|  216|
|    null|     2|  184|
|    null|     3|  491|
|       0|  null|  549|
|       0|     1|   80|
|       0|     2|   97|
|       0|     3|  372|
|       1|  null|  342|
|       1|     1|  136|
|       1|     2|   87|
|       1|     3|  119|
+--------+------+-----+



In [16]:
df.rollup("Survived","Pclass").count().orderBy("Survived", "Pclass").show()

+--------+------+-----+
|Survived|Pclass|count|
+--------+------+-----+
|    null|  null|  891|
|       0|  null|  549|
|       0|     1|   80|
|       0|     2|   97|
|       0|     3|  372|
|       1|  null|  342|
|       1|     1|  136|
|       1|     2|   87|
|       1|     3|  119|
+--------+------+-----+



In [17]:
q4 = spark.sql("""SELECT
                Pclass,
                Fare,
                Survived,
                COUNT(Survived) OVER (PARTITION BY Pclass, Survived ORDER BY Pclass DESC) as cant_survive
            FROM
                titanic
            LIMIT
                10
          """)

In [18]:
from pyspark.sql.window import Window
windowSpec = \
  Window\
    .partitionBy(df['Pclass'], df['Survived']) \
    .orderBy(df['Pclass'].desc())
    
q3 = df.select(df.Pclass, df.Fare, df.Survived,
               F.count(df.Survived).over(windowSpec).alias("cant_survived")).limit(10)

q3.show()

+------+-------+--------+-------------+
|Pclass|   Fare|Survived|cant_survived|
+------+-------+--------+-------------+
|     1|51.8625|       0|           80|
|     1|  263.0|       0|           80|
|     1|27.7208|       0|           80|
|     1|82.1708|       0|           80|
|     1|   52.0|       0|           80|
|     1|61.9792|       0|           80|
|     1| 83.475|       0|           80|
|     1|27.7208|       0|           80|
|     1|   47.1|       0|           80|
|     1| 61.175|       0|           80|
+------+-------+--------+-------------+



In [19]:
q3.explain()

== Physical Plan ==
CollectLimit 10
+- Window [count(Survived#17) windowspecdefinition(Pclass#18, Survived#17, Pclass#18 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cant_survived#348L], [Pclass#18, Survived#17], [Pclass#18 DESC NULLS LAST]
   +- *(2) Sort [Pclass#18 ASC NULLS FIRST, Survived#17 ASC NULLS FIRST, Pclass#18 DESC NULLS LAST], false, 0
      +- Exchange hashpartitioning(Pclass#18, Survived#17, 200), true, [id=#447]
         +- *(1) Project [Pclass#18, Fare#21, Survived#17]
            +- FileScan csv [Survived#17,Pclass#18,Fare#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/DiploUTN/datos/titanic.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Survived:int,Pclass:int,Fare:double>




In [20]:
q4.write.mode("overwrite").option("header","true").save("datos/example", format='csv')

In [21]:
spark.read.format('csv').option("header", "true")\
                .option("inferSchema", "true")\
                .load('datos/example').show()

+------+-------+--------+------------+
|Pclass|   Fare|Survived|cant_survive|
+------+-------+--------+------------+
|     1|51.8625|       0|          80|
|     1|  263.0|       0|          80|
|     1|27.7208|       0|          80|
|     1|82.1708|       0|          80|
|     1|   52.0|       0|          80|
|     1|61.9792|       0|          80|
|     1| 83.475|       0|          80|
|     1|27.7208|       0|          80|
|     1|   47.1|       0|          80|
|     1| 61.175|       0|          80|
+------+-------+--------+------------+

