In [9]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "4")

## Schemas

In [10]:
df = spark.read.format('json').load('../data/flight-data/json/2015-summary.json')

In [11]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [12]:
df.schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

Custom Schema

You can Define a custom Schema

In [16]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([StructField("DEST_COUNTRY_NAME", StringType(), True),\
                             StructField("ORIGIN_COUNTRY_NAME", StringType(), True),\
                             StructField("count", LongType(), False, metadata={"hello":"world"})])
df = spark.read.format("json").schema(myManualSchema).load("../data/flight-data/json/2015-summary.json")

## Columns and Expressions

In [20]:
from pyspark.sql.functions import col, column
print(col("someColumnName"))
print(column("someColumnName"))

Column<b'someColumnName'>
Column<b'someColumnName'>


### Explict column reference

In [32]:
df[['count','DEST_COUNTRY_NAME']]

DataFrame[count: bigint, DEST_COUNTRY_NAME: string]

In [33]:
df['count']

Column<b'count'>

In [35]:
col('count')

Column<b'count'>

### Expression

In [36]:
from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) < otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

In [37]:
type(expr("(((someCol + 5) * 200) - 6) < otherCol"))

pyspark.sql.column.Column

#### access loop to columns

In [38]:
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [39]:
for col in df.columns:
    print(col)

DEST_COUNTRY_NAME
ORIGIN_COUNTRY_NAME
count


## Records and row

In [40]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

#### Creating Rows
Only DF has a schema, not a single row. You can create a row passing the values in order, and after append them to desiderate dataframe

In [41]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [43]:
print(myRow[0])
print(myRow[2])

Hello
1


## Creating Dataframe

In [48]:
df = spark.read.format('json').load('../data/flight-data/json/2015-summary.json')
df.createOrReplaceTempView('dfTable')
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



manual dataframe

In [46]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([StructField('some',StringType(),True), StructField('col',StringType(),True), StructField('names',LongType(),False)])
myRow = Row('hello',None,1)
myDf = spark.createDataFrame([myRow],myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|hello|null|    1|
+-----+----+-----+



select and selectExpr

In [49]:
#sql select DEST_COUNTRY_NAME from dfTable limit 2;
df.select('DEST_COUNTRY_NAME').show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [50]:
df.select('DEST_COUNTRY_NAME','ORIGIN_COUNTRY_NAME').show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



You can reference to column using even col,column and expr, 

In [53]:
from pyspark.sql.functions import expr, col, column
df.select(expr("DEST_COUNTRY_NAME"),col("DEST_COUNTRY_NAME"),column("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



but you don't mix object and string. Rembember expr, col,column return obect.
It's not TRUE , now you can

In [57]:
#df.select(col('DEST_COUNTRY_NAME'),'DEST_COUNTRY_NAME').show(2)
df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")

DataFrame[DEST_COUNTRY_NAME: string, DEST_COUNTRY_NAME: string]

expr let us to column manipulation.
Try to change the name with AS

In [61]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



it changes the name back

In [64]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



beacuse select followed by a serie od expr is a common pattern, spark impement selectExp for reducing the verbosity

selectExpr('stringExpr','stringExp',....,)

In [66]:
df.selectExpr('DEST_COUNTRY_NAME as destination', 'DEST_COUNTRY_NAME').show(2)

+-------------+-----------------+
|  destination|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [67]:
#we can add every non aggregation expression end gettin a new columns
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



we can aggregate entire dataframe

In [78]:
spark.conf.set("spark.sql.shuffle.partitions", "20")
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").explain()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+

== Physical Plan ==
*HashAggregate(keys=[], functions=[avg(count#131L), count(distinct DEST_COUNTRY_NAME#129)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[merge_avg(count#131L), partial_count(distinct DEST_COUNTRY_NAME#129)])
      +- *HashAggregate(keys=[DEST_COUNTRY_NAME#129], functions=[merge_avg(count#131L)])
         +- Exchange hashpartitioning(DEST_COUNTRY_NAME#129, 20)
            +- *HashAggregate(keys=[DEST_COUNTRY_NAME#129], functions=[partial_avg(count#131L)])
               +- *FileScan json [DEST_COUNTRY_NAME#129,count#131L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/media/francesco/Data/Spark-The-Definitive-Guide/data/flight-data/json/201..., PartitionFilters: [], PushedFilters: [], ReadS

In [79]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").explain()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+

== Physical Plan ==
*HashAggregate(keys=[], functions=[avg(count#131L), count(distinct DEST_COUNTRY_NAME#129)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[merge_avg(count#131L), partial_count(distinct DEST_COUNTRY_NAME#129)])
      +- *HashAggregate(keys=[DEST_COUNTRY_NAME#129], functions=[merge_avg(count#131L)])
         +- Exchange hashpartitioning(DEST_COUNTRY_NAME#129, 5)
            +- *HashAggregate(keys=[DEST_COUNTRY_NAME#129], functions=[partial_avg(count#131L)])
               +- *FileScan json [DEST_COUNTRY_NAME#129,count#131L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/media/francesco/Data/Spark-The-Definitive-Guide/data/flight-data/json/201..., PartitionFilters: [], PushedFilters: [], ReadSc

### Literals
In Spark you could have need to pass const as columns for future evaluation. 

In [84]:
from pyspark.sql.functions import lit
df.select(expr('*'), lit(1).alias('One')).show(2)
df.selectExpr('*','1 as One').show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



### Adding colums

withColumn want two parameters

In [88]:
df.withColumn("numberOne", lit(1)).show(2)
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME ==DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



### Renaming columns

In [90]:
df.withColumnRenamed('DEST_COUNTRY_NAME','dest')
df.withColumnRenamed('DEST_COUNTRY_NAME','dest').show(2)

+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows



In [93]:
# if you use long name with space or dash after you have to use ` for handling name
dfWithLongColName = df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColName.selectExpr('`This Long Column-Name`','`This Long Column-Name` as `new col`').show(2)
dfWithLongColName.select(expr("`This Long Column-Name`")).columns

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



['This Long Column-Name']

### Case Senisitivity
Usally spark is case insesitive but you can change its behaviour

In [99]:
df.select(expr('ORIGIN_COUNTRY_NAME')).show(2)
df.select(expr('origin_country_name')).show(2)

+-------------------+
|ORIGIN_COUNTRY_NAME|
+-------------------+
|            Romania|
|            Croatia|
+-------------------+
only showing top 2 rows

+-------------------+
|origin_country_name|
+-------------------+
|            Romania|
|            Croatia|
+-------------------+
only showing top 2 rows



In [106]:
dir(spark.conf._jconf)

['$lessinit$greater$default$1',
 'contains',
 'equals',
 'get',
 'getAll',
 'getClass',
 'getOption',
 'hashCode',
 'notify',
 'notifyAll',
 'set',
 'toString',
 'unset',
 'wait']

### Removing columns

In [112]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [113]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

['count', 'This Long Column-Name']

### CAST column

In [116]:
df.withColumn("count2", col("count").cast("int"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: int]

### Filtering rows
* 1) build expression as a String
* 2) build expression as a column manipulation

Apply a filter using:
* 1) where
* 2)filter

for similarity with sql we'll use where

In [120]:
df.filter(col('count')<2).show(2)
df.filter('count<2').show(2)
df.filter(expr('count<2')).show(2)
df.where(col('count')<2).show(2)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+----

Spark performs filter condition togheter ignoring the order so this means that if you want to specify multiple AND filters, just chain them
sequentially and let Spark handle the rest

In [121]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") !="Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [126]:
df.where(col('ORIGIN_COUNTRY_NAME') == 'Italy').show(20)
df.where('ORIGIN_COUNTRY_NAME == "Italy" or DEST_COUNTRY_NAME == "Italy"').show(20)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|              Italy|  438|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Italy|      United States|  382|
|    United States|              Italy|  438|
+-----------------+-------------------+-----+



### Distinct

In [129]:
df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()

256

In [130]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

## Random sample
with replacment or not

In [143]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

126

In [144]:
df.sample(withReplacement, fraction, seed).selectExpr('sum(count)').show()
df.sample(withReplacement, fraction, seed).select('sum(count)').show()

+----------+
|sum(count)|
+----------+
|    412703|
+----------+



### Random Split

In [145]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

False

### UNION
!!! union is done by location not schema !!! 

In [156]:
from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5),
Row("New Country 2", "Other Country 3", 1)
]
#parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(newRows, schema)
newDF.show()
df.union(newDF).where("count = 1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    5|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Coun

### Sorting Rows
sort
groupby

In [158]:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [170]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count").desc()).show(2)
df.orderBy(col("count").desc(),col("DEST_COUNTRY_NAME").asc()).show(2)
df.orderBy(col('ORIGIN_COUNTRY_NAME').asc(),col('DEST_COUNTRY_NAME').desc()).show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|             Angola|   13|
|    United States|           Anguilla|   38|
+-----------------+-------------------+-----+
only showing top 2 rows



### LIMIT

In [171]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



### Repartition and Coalesce
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

Another important optimization opportunity is to partition the data according to
some frequently filtered columns, which control the physical layout of data
across the cluster including the partitioning scheme and the number of partitions.
Repartition will incur a full shuffle of the data, regardless of whether one is
necessary. This means that you should typically only repartition when the future
number of partitions is greater than your current number of partitions or when
you are looking to partition by a set of columns:

In [173]:
df.rdd.getNumPartitions() #1
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

If you know that you’re going to be filtering by a certain column often, it can be
worth repartitioning based on that column:

In [174]:
df.repartition(col("DEST_COUNTRY_NAME"))
#You can optionally specify the number of partitions you would like, too:
df.repartition(5,col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [180]:
df.explain()

== Physical Plan ==
*FileScan json [DEST_COUNTRY_NAME#129,ORIGIN_COUNTRY_NAME#130,count#131L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/media/francesco/Data/Spark-The-Definitive-Guide/data/flight-data/json/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>


In [181]:
df.repartition(5,col("DEST_COUNTRY_NAME")).explain()

== Physical Plan ==
Exchange hashpartitioning(DEST_COUNTRY_NAME#129, 5)
+- *FileScan json [DEST_COUNTRY_NAME#129,ORIGIN_COUNTRY_NAME#130,count#131L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/media/francesco/Data/Spark-The-Definitive-Guide/data/flight-data/json/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>


Coalesce, on the other hand, will not incur a full shuffle and will try to combine
partitions. This operation will shuffle your data into five partitions based on the
destination country name, and then coalesce them (without a full shuffle):

In [183]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2).explain()

== Physical Plan ==
Coalesce 2
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#129, 5)
   +- *FileScan json [DEST_COUNTRY_NAME#129,ORIGIN_COUNTRY_NAME#130,count#131L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/media/francesco/Data/Spark-The-Definitive-Guide/data/flight-data/json/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>


### Collecting Rows to the Driver

Sometimes ypu want to collect data.
* show  prints out a number of rows nicely
* collect gets all data from the entire DataFrame
* take  selects the first N rows

In [185]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India         

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

toLocalIterator collects partitions to the driver
as an iterator. This method allows you to iterate over the entire dataset partition-
by-partition in a serial manner

In [188]:
collectDF.toLocalIterator()
for item in collectDF.toLocalIterator():
    print(item)

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)
Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62)
Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588)
Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40)
Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)
