## Dataframe is made up of rows and columns and Transformation means modifying or manipulating these two

## Note : Columns can also be treated as Expressions

In [0]:
spark

In [0]:
df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/creationsbyyogesh@gmail.com/2015_summary.json")

In [0]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



Creating a Dataframe
1. We can create by reading from DataSources
2. We can create manually

We have already created a DataFrame by reading json data above. We will use the same DF as temp view so that we can run SQL Queries on top of it

In [0]:
df.createOrReplaceTempView("dfTable")

We can create Manual Schema which will cover the columns part of DF and we can also create Rows with random values

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql import Row
myManualSchema = StructType(
  [
    StructField("Name", StringType(), False),
    StructField("Age", LongType(), False),
    StructField("Occupation", StringType(), True)
  ]
)
myRow = Row("Yogesh",30,"Data Engineer")
myNewDf = spark.createDataFrame([myRow],myManualSchema)

In [0]:
myNewDf.show()

+------+---+-------------+
|  Name|Age|   Occupation|
+------+---+-------------+
|Yogesh| 30|Data Engineer|
+------+---+-------------+



# select and selectExpr

Remember we created a tempView above "dfTable". See below we can use SQL statements to query it just like in DB

In [0]:
%sql
select DEST_COUNTRY_NAME from dfTable LIMIT 2

DEST_COUNTRY_NAME
United States
United States


Or We can use Python and use methods

In [0]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



Returning Multiple Columns

In [0]:
%sql
select DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME from dfTable LIMIT 2

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME
United States,Romania
United States,Croatia


In [0]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [0]:
df.selectExpr("DEST_COUNTRY_NAME AS DEST", "DEST_COUNTRY_NAME").show(2)

+-------------+-----------------+
|         DEST|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [0]:
df.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) AS WITHIN_COUNTRY")\
    .show(2)

+-----------------+-------------------+-----+--------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|WITHIN_COUNTRY|
+-----------------+-------------------+-----+--------------+
|    United States|            Romania|   15|         false|
|    United States|            Croatia|    1|         false|
+-----------------+-------------------+-----+--------------+
only showing top 2 rows



In [0]:
df.selectExpr("AVG(count)","count(distinct(ORIGIN_COUNTRY_NAME))").show(2)

+-----------+-----------------------------------+
| avg(count)|count(DISTINCT ORIGIN_COUNTRY_NAME)|
+-----------+-----------------------------------+
|1770.765625|                                125|
+-----------+-----------------------------------+



## Literals

In [0]:
from pyspark.sql.functions import lit, expr
df.select(expr("*"),lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



lit does not work in selectExpr because selectExpr expects a raw SQL string that can be selected

In [0]:
from pyspark.sql.functions import lit
df.selectExpr("*","1 AS one").show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



## Adding Columns

In [0]:
df.withColumn("numberOne",lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



### Note : Columns are nothing but expressions in Spark. Notice that the withColumn function takes two arguments: the column name and the expression that will create the value for that given row in the DataFrame.

In [0]:
df.withColumn("withinCountry",expr("DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



## Renaming Columns

In [0]:
df.withColumnRenamed("DEST_COUNTRY_NAME","Dest").show(2)

+-------------+-------------------+-----+
|         Dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows



In [0]:
df.withColumnRenamed("DEST_COUNTRY_NAME","Dest").columns

Out[18]: ['Dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [0]:
df.withColumnRenamed("DEST_COUNTRY_NAME","Dest").printSchema()

root
 |-- Dest: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



## Reserved Characters and Keywords
### In Spark, we do this by using backtick (`) characters.

In [0]:
dfWithLongColName = df.withColumn("long Col Name",expr("DEST_COUNTRY_NAME"))
dfWithLongColName.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|long Col Name|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|United States|
|    United States|            Croatia|    1|United States|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



**We don’t need escape characters here because the first argument to withColumn is just a string
for the new column name. In this example, however, we need to use backticks because we’re
referencing a column in an expression:**

In [0]:
dfWithLongColName.selectExpr("*","`long Col Name` AS longCol").show(2)

+-----------------+-------------------+-----+-------------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|long Col Name|      longCol|
+-----------------+-------------------+-----+-------------+-------------+
|    United States|            Romania|   15|United States|United States|
|    United States|            Croatia|    1|United States|United States|
+-----------------+-------------------+-----+-------------+-------------+
only showing top 2 rows



## Removing Columns

In [0]:
dfWithLongColName.drop("long Col Name","longCol").columns

Out[24]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

## Changing a Column’s Type (cast)

In [0]:
from pyspark.sql.functions import col
df.withColumn("count2",col("count").cast("int"))

Out[27]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: int]

## Filtering Rows

The most common way to do this with DataFrames is to
create either an expression as a String or build an expression by using a set of column
manipulations. There are two methods to perform this operation: you can use where or filter
and they both will perform the same operation and accept the same argument types when used
with DataFrames. We will stick to where because of its familiarity to SQL; however, filter is
valid as well.

**It is best to build an expression and just use WHERE. This is least confusing way**

In [0]:
df.where("count<2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
%sql
select * from dfTable where count<2 LIMIT 2

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Croatia,1
United States,Singapore,1


**Instinctually, you might want to put multiple filters into the same expression. Although this is
possible, it is not always useful, because Spark automatically performs all filtering operations at
the same time regardless of the filter ordering. This means that if you want to specify multiple
AND filters, just chain them sequentially and let Spark handle the rest:**

In [0]:
df.where("count<2").where("ORIGIN_COUNTRY_NAME<>'Croatia'").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
%sql
select * from dfTable where count<2 AND ORIGIN_COUNTRY_NAME <> 'Croatia' LIMIT 2

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Singapore,1
Moldova,United States,1


## Getting Unique Rows

In [0]:
df.select("DEST_COUNTRY_NAME").distinct().count()

Out[33]: 132

In [0]:
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").distinct().count()

Out[34]: 256

## Random Samples

**fraction**
By using fraction between 0 to 1, it returns the approximate number of the fraction of the dataset. For example, 0.1 returns 10% of the rows. However, this does not guarantee it returns the exact 10% of the records.

**seed**
The random seed is a numerical value that repeats pseudo-random numbers in Python. The value in the random seed saves the state of randomness. For example, If we call the seed function with seed value ‘1’ multiple times, the computer generates the same random numbers. When the value is not mentioned in the random seed, then the computer will use the current system time in seconds or milliseconds as a seed value to generate a different set of random numbers.

**withReplacement**
some times you may need to get a random sample with repeated values. By using the value true, results in repeated values. It is set to False by Default

Example :
//Output: 0,5,9,11,14,14,16,17,21,29,33,41,42,52,52,54,58,65,65,71,76,79,85,96
On above example, values 14, 52 and 65 are repeated values.

In [0]:
seed = 5
withReplacement = True
fraction = 0.1
df.sample(withReplacement, fraction, seed).count()

Out[5]: 29

## Random Splits
Random splits can be helpful when you need to break up your DataFrame into a random “splits”
of the original DataFrame. This is often used with machine learning algorithms to create training,
validation, and test sets. In this next example, we’ll split our DataFrame into two different
DataFrames by setting the weights by which we will split the DataFrame (these are the
arguments to the function). Because this method is designed to be randomized, we will also
specify a seed (just replace seed with a number of your choosing in the code block). It’s
important to note that if you don’t specify a proportion for each DataFrame that adds up to one,
they will be normalized so that they do:

In [0]:
seed = 56
dataFrames = df.randomSplit([0.25,0.75],seed)
dataFrames[0].count()

Out[6]: 69

In [0]:
dataFrames[1].count()

Out[8]: 187

## Concatenating and Appending Rows (Union)
DataFrames are immutable. This means users cannot
append to DataFrames because that would be changing it. To append to a DataFrame, you must
union the original DataFrame along with the new DataFrame. This just concatenates the two
DataFramess. To union two DataFrames, you must be sure that they have the same schema and
number of columns; otherwise, the union will fail.

### Parallelize Method :
EXAMPLE :

Under the covers, there are quite a few actions that happened when you created your RDD. Let's start with the RDD creation and break down this code snippet:

myRDD = sc.parallelize(  [('Mike', 19), ('June', 18), ('Rachel',16), ('Rob', 18), ('Scott', 17)])

Focusing first on the statement in the sc.parallelize() method, we first created a Python list (that is, [A, B, ..., E]) composed of a list of arrays (that is, ('Mike', 19), ('June', 19), ..., ('Scott', 17)). The sc.parallelize() method is the SparkContext's parallelize method to create a parallelized collection. **This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data**:

In [0]:
df.schema

Out[4]: StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

In [0]:
df_new_schema = df.schema
my_Rows = [
    ("India","India",10),
    ("Japan","Russia",20)
]
parallelized_rows = spark.sparkContext.parallelize(my_Rows)
df_new = spark.createDataFrame(parallelized_rows, df_new_schema)
df_new.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            India|              India|   10|
|            Japan|             Russia|   20|
+-----------------+-------------------+-----+



In [0]:
df.union(df_new)\
    .where("count in (10,20)")\
    .where("DEST_COUNTRY_NAME in ('India','Japan')")\
    .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            India|              India|   10|
|            Japan|             Russia|   20|
+-----------------+-------------------+-----+



## Sorting Rows
There are two equivalent operations to do this sort
and orderBy that work the exact same way. They accept both column expressions and strings as
well as multiple columns. The default is to sort in ascending order:

In [0]:
df.sort("count").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import asc, desc, expr
df.orderBy(expr("count desc")).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import asc, desc, expr
df.orderBy(expr("count desc"),expr("DEST_COUNTRY_NAME desc")).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import asc, desc, expr, col
df.orderBy(col("DEST_COUNTRY_NAME").desc(),col("count").desc()).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Zambia|      United States|    1|
|        Venezuela|      United States|  290|
+-----------------+-------------------+-----+
only showing top 2 rows



## Limit

In [0]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [0]:
df.orderBy(expr("count desc")).limit(5).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+



## Repartition and Coalesce
Another important optimization opportunity is to partition the data according to some **frequently
filtered columns**, which control the physical layout of data across the cluster including the
partitioning scheme and the number of partitions.
Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This
means that you should typically **only repartition when the future number of partitions is greater
than your current number of partitions** or when you are looking to partition by a set of columns:

In [0]:
df.rdd.getNumPartitions()

Out[27]: 1

In [0]:
df.repartition(col("DEST_COUNTRY_NAME"))

Out[28]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [0]:
df.rdd.getNumPartitions()

Out[30]: 1

In [0]:
df.repartition(5,col("DEST_COUNTRY_NAME"))

Out[31]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [0]:
df.rdd.getNumPartitions()

Out[32]: 1

Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. This
operation will shuffle your data into five partitions based on the destination country name, and
then coalesce them (without a full shuffle):

In [0]:
df.repartition(5,col("DEST_COUNTRY_NAME")).coalesce(2)

Out[33]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]