In [122]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.remote("sc://192.168.2.20:15002").getOrCreate()

from pyspark.sql.types import StructField, StructType, StringType, LongType

explicitSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), False),
StructField("ORIGIN_COUNTRY_NAME", StringType(), False),
StructField("count", LongType(), False, metadata={"hello":"world"})
])

df = spark\
    .read\
    .format("json")\
    .schema(explicitSchema)\
    .load("datasets/2015-summary.json")
    
df.createOrReplaceTempView("dfTable")

In [6]:
from pyspark.sql.functions import col, column, expr, lit

# SELECT *, 1 as numberOne FROM dfTable LIMIT 2
df.withColumn("numberOne", lit(1)).show(2)

# SELECT *, ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME as withinCountry FROM dfTable LIMIT 2
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

# SELECT *, DEST_COUNTRY_NAME as Destination FROM dfTable LIMIT 2
df.withColumn("Destination", col("DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|  Destination|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|United States|
|    United St

In [7]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [None]:
from pyspark.sql.functions import expr

# No escape quotes
dfWithLongColName = df.withColumn(
    "This Long Column-Name", 
    expr("ORIGIN_COUNTRY_NAME"))

# Escape quotes
dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")\
    .show(2)
    
print(dfWithLongColName.select(col("This Long Column-Name")).columns)
print(dfWithLongColName.select(expr("`This Long Column-Name`")).columns)

# Nota: Spark es case-insensitive
spark.conf.get("spark.sql.caseSensitive")


+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows

['This Long Column-Name']
['This Long Column-Name']


'false'

In [20]:
print(df.drop("ORIGIN_COUNTRY_NAME").columns)

print(dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns)

print(dfWithLongColName.columns)

['DEST_COUNTRY_NAME', 'count']
['count', 'This Long Column-Name']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'This Long Column-Name']


In [27]:
df.withColumn("count2", col("count").cast("float")).show(2)

spark.sql("SELECT *, cast(count as float) AS count2 FROM dfTable").show(2)

+-----------------+-------------------+-----+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
+-----------------+-------------------+-----+------+
|    United States|            Romania|   15|  15.0|
|    United States|            Croatia|    1|   1.0|
+-----------------+-------------------+-----+------+
only showing top 2 rows

+-----------------+-------------------+-----+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
+-----------------+-------------------+-----+------+
|    United States|            Romania|   15|  15.0|
|    United States|            Croatia|    1|   1.0|
+-----------------+-------------------+-----+------+
only showing top 2 rows



In [29]:
df.filter(col("count") < 2).show(2)

df.where("count < 2").show(2)

# SELECT * FROM dfTable WHERE count < 2 AND ORIGIN_COUNTRY_NAME != "Croatia" LIMIT 2
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
 .show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [30]:
# SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
print(df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count())

# SELECT COUNT(DISTINCT ORIGIN_COUNTRY_NAME) FROM dfTable
print(df.select("ORIGIN_COUNTRY_NAME").distinct().count())


256
125


In [None]:
import random

seed = 5 # random.randint(0, 100)
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).show(2)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [88]:
dataFrames = df.randomSplit([0.25, 0.25, 0.25, 0.25], seed)

print([df.count() for df in dataFrames])

[71, 67, 65, 53]


In [99]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

schema0 = StructType([
    StructField("NAME", StringType(), True),
    StructField("LAST_NAME", StringType(), True),
    StructField("AGE", LongType(), False)
])

rows0 = [
    Row("John", "Doe", 25),
    Row("Jane", "Doe", 22),
    Row("Jill", "Doe", 20),
    Row("Jack", "Doe", 18)
]

schema1 = StructType([
    StructField("LAST_NAME", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("AGE", LongType(), False)
])

rows1 = [
    Row("Doe", "John", 25),
    Row("Doe", "Jane", 22),
    Row("Doe", "Jill", 20),
    Row("Doe", "Jack", 18)
]

schema2 = StructType([
    StructField("NAME", StringType(), True),
    StructField("AGE", LongType(), False),
    StructField("LAST_NAME", StringType(), True)
])

rows2 = [
    Row("John", 25, "Doe"),
    Row("Jane", 22, "Doe"),
    Row("Jill", 20, "Doe"),
    Row("Jack", 18, "Doe")
]

df0 = spark.createDataFrame(rows0, schema0)
df1 = spark.createDataFrame(rows1, schema1)
df2 = spark.createDataFrame(rows2, schema2)

# Union df0 and df1
df0.union(df1).show()

# Union df2 and df1
df2.union(df1).schema




+----+---------+---+
|NAME|LAST_NAME|AGE|
+----+---------+---+
|John|      Doe| 25|
|Jane|      Doe| 22|
|Jill|      Doe| 20|
|Jack|      Doe| 18|
| Doe|     John| 25|
| Doe|     Jane| 22|
| Doe|     Jill| 20|
| Doe|     Jack| 18|
+----+---------+---+



StructType([StructField('NAME', StringType(), True), StructField('AGE', StringType(), True), StructField('LAST_NAME', StringType(), True)])

In [104]:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [156]:

from pyspark.sql.functions import expr

# SELECT * FROM dfTable ORDER BY count DESC LIMIT 2
df.orderBy(expr("count desc")).show(2) # Preguntar por qué no funciona con "count desc"

# SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



In [158]:
# SELECT * FROM dfTable LIMIT 6
df.limit(5).show() # Preguntar diferencia entre limit y show

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [171]:
print(df.repartition(10).explain())

print(df.repartition(col("DEST_COUNTRY_NAME")).explain())

print(df.repartition(5, col("DEST_COUNTRY_NAME")).explain())

print(df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2).explain())

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=7544]
   +- FileScan json [DEST_COUNTRY_NAME#10723,ORIGIN_COUNTRY_NAME#10724,count#10725L] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[hdfs://node-master:9000/user/hadoop/datasets/2015-summary.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>


None
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#10733, 200), REPARTITION_BY_COL, [plan_id=7552]
   +- FileScan json [DEST_COUNTRY_NAME#10733,ORIGIN_COUNTRY_NAME#10734,count#10735L] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[hdfs://node-master:9000/user/hadoop/datasets/2015-summary.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count

In [172]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India         

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]