In [6]:
import findspark
findspark.init()
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

3.14161348


In [7]:
# Inicializando la Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate() 

# Creamos un DataFrame con una columna que contiene 1000 filas con valores de 0 a 999
myRange = spark.range(1000).toDF("number")

In [8]:
# TRANSFORMACIÓN: Para encontrar todos los num pares del DataFrame(DF)
divisBy = myRange.where("number % 2 = 0")

In [9]:
# ACCIÓN: Contar
divisBy.count()

500

In [11]:
# Utilizando un csv para pasarlo a DF
fligthData2015 = spark.read.option("inferSchema", "true")\
.option("header", "true"). csv("2015-summary.csv")

In [12]:
fligthData2015.head(6)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

In [13]:
fligthData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [14]:
# Se utiliza explain para ver cómo Spark ejecutará esta consulta
fligthData2015.sort("count").explain()

== Physical Plan ==
*Sort [count#19 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200)
   +- *FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/APRENDER/Spark/GuiaDefinitivaSpark/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In [15]:
# De forma aleatoria cuando realizamos una repordución aleatoria, Spark
# emite 200particiones mezcladas, en el código de abajo lo estamos
# estableciendo en 5
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [16]:
fligthData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [17]:
# Convertimos el DF a una vista temporal, también puede convertirse a una tabla
fligthData2015.createOrReplaceTempView("flight_data_2015")

In [19]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = fligthData2015.groupBy("DEST_COUNTRY_NAME").count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5)
   +- *HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
      +- *FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/APRENDER/Spark/GuiaDefinitivaSpark/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5)
   +- *HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
      +- *FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/APRENDER/Spark/GuiaDefinitivaSpark/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


In [20]:
# Con consulta SQL
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [22]:
# Con sintaxis de DataFrame
from pyspark.sql.functions import max
fligthData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [23]:
# Primera consulta de transformación múltiple, comenzaremos con una
# consulta SQL bastante sencilla

maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [26]:
from pyspark.sql.functions import desc

fligthData2015.groupBy("DEST_COUNTRY_NAME").sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total")).limit(5).show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [28]:
fligthData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#152L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#152L])
+- *HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(cast(count#19 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5)
      +- *HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(cast(count#19 as bigint))])
         +- *FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/APRENDER/Spark/GuiaDefinitivaSpark/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
