# Quickstart

This is a short introduction and quickstart for the PySpark DataFrame API. PySpark DataFrames are lazily evaluated. They are implemented on top of [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview)s. When Spark [transforms](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) data, it does not immediately compute the transformation but plans how to compute later. When [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) such as `collect()` are explicitly called, the computation starts.
This notebook shows the basic usages of the DataFrame, geared mainly for new users. You can run the latest version of these examples by yourself on a live notebook [here](https://mybinder.org/v2/gh/apache/spark/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).

There is also other useful information in Apache Spark documentation site, see the latest version of [Spark SQL and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html), [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html), [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), [Spark Streaming Programming Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html) and [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html).

PySpark applications start with initializing `SparkSession` which is the entry point of PySpark as below. In case of running it in PySpark shell via <code>pyspark</code> executable, the shell automatically creates the session in the variable <code>spark</code> for users.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## DataFrame Creation

A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and an RDD consisting of such a list.
`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.

Firstly, you can create a PySpark DataFrame from a list of rows

In [2]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create a PySpark DataFrame with an explicit schema.

In [3]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create a PySpark DataFrame from a pandas DataFrame

In [4]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create a PySpark DataFrame from an RDD consisting of a list of tuples.

In [5]:
rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

The DataFrames created above all have the same results and schema.

In [6]:
# All DataFrames above result same.
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



## Viewing Data

The top rows of a DataFrame can be displayed using `DataFrame.show()`.

In [7]:
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration.

In [8]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


The rows can also be shown vertically. This is useful when rows are too long to show horizontally.

In [9]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



You can see the DataFrame's schema and column names as follows:

In [10]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [11]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



Show the summary of the DataFrame

In [12]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [13]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`.

In [14]:
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

PySpark DataFrame also provides the conversion back to a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) to leverage pandas APIs. Note that `toPandas` also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side.

In [15]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


## Selecting and Accessing Data

PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a `Column` instance.

In [16]:
df.a

Column<b'a'>

In fact, most of column-wise operations return `Column`s.

In [17]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

These `Column`s can be used to select the columns from a DataFrame. For example, `DataFrame.select()` takes the `Column` instances that returns another DataFrame.

In [18]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



Assign new `Column` instance.

In [19]:
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



To select a subset of rows, use `DataFrame.filter()`.

In [20]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Applying a Function

PySpark supports various UDFs and APIs to allow users to execute Python native functions. See also the latest [Pandas UDFs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs) and [Pandas Function APIs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-function-apis). For instance, the example below allows users to directly use the APIs in [a pandas Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) within Python native function.

In [21]:
import pandas
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



Another example is `DataFrame.mapInPandas` which allows users directly use the APIs in a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) without any restrictions such as the result length.

In [22]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Grouping Data

PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy.
It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.

In [23]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



Grouping and then applying the `avg()` function to the resulting groups.

In [24]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+



You can also apply a Python native function against each group by using pandas APIs.

In [25]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
+-----+------+---+---+



Co-grouping and applying a function.

In [26]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



## Getting Data in/out

CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.

There are many other data sources available in PySpark such as JDBC, text, binaryFile, Avro, etc. See also the latest [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) in Apache Spark documentation.

### CSV

In [27]:
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



### Parquet

In [28]:
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



### ORC

In [29]:
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



## Working with SQL

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:

In [30]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In addition, UDFs can be registered and invoked in SQL out of the box:

In [31]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



These SQL expressions can directly be mixed and used as PySpark columns.

In [32]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+



# SparkContext

In [2]:
from pyspark import SparkContext, SparkConf
sc = SparkContext()
sc.getConf().getAll()

[('spark.driver.host', 'jupyter-apache-2dspark-2d7d3kffdc'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.port', '37989'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1622868867436'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.app.startTime', '1622868865714')]

In [4]:
# Creating an RDD
names = sc.parallelize(['Adam','Cray','Shaun','Brain','Mark','Christ','Shail','Satya','Mark','Norby','Frans','Mark','Bill'])
print(type(names))

<class 'pyspark.rdd.RDD'>


In [5]:
a = names.collect()
print(a)
print(type(a))

['Adam', 'Cray', 'Shaun', 'Brain', 'Mark', 'Christ', 'Shail', 'Satya', 'Mark', 'Norby', 'Frans', 'Mark', 'Bill']
<class 'list'>


In [6]:
names.countByValue()

defaultdict(int,
            {'Adam': 1,
             'Cray': 1,
             'Shaun': 1,
             'Brain': 1,
             'Mark': 3,
             'Christ': 1,
             'Shail': 1,
             'Satya': 1,
             'Norby': 1,
             'Frans': 1,
             'Bill': 1})

## Glom Pyspark

In [12]:
num = sc.parallelize([5,5,4,3,2,9,2],1) # By default 1 partition 
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [13]:
num.glom().collect()

[[5, 5, 4, 3, 2, 9, 2]]

In [14]:
num = sc.parallelize([5,5,4,3,2,9,2],2) #  2 partition 
num.glom().collect()

[[5, 5, 4], [3, 2, 9, 2]]

In [15]:
num = sc.parallelize([5,5,4,3,2,9,2],3) #  3 partition 
num.glom().collect()

[[5, 5], [4, 3], [2, 9, 2]]

In [16]:
# Type of GLOM is PipelinedRDD 
type(num.glom())

pyspark.rdd.PipelinedRDD

In [20]:
# Lets say we create partition more than the element count
num = sc.parallelize([5,5,4,3,2,9,2],10) #  3 partition 
num.glom().collect()

[[], [5], [5], [], [4], [3], [], [2], [9], [2]]

## Reduce and Fold in Pyspark


In [21]:
num.reduce(lambda a,b: a+b)

30

In [22]:
num.reduce(lambda a,b: a*b)

10800

In [26]:
# Fold Examples 
# Collect vs GLOM Collect
num = sc.parallelize([5,5,4,3,2,9,2],1)
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [27]:
num.glom().collect()

[[5, 5, 4, 3, 2, 9, 2]]

In [32]:
# For 2 partition 
num = sc.parallelize([5,5,4,3,2,9,2],1)
print(num.collect())   # Collect keeps all values in a single partition only 
print(num.glom().collect()) # glom will distribute values in different partitons 

[5, 5, 4, 3, 2, 9, 2]
[[5, 5, 4, 3, 2, 9, 2]]


In [44]:
# Partition = 1
# fold adds/multiply respective value according to number of partitions
# let n = number of partitions 

n = 3
for i in range(1,n+1):
    print(f"Number of partition are : {i}")
    num = sc.parallelize([5,5,4,3,2,9,2],i)
    print("Results from simple collect : ",num.collect())   # Collect keeps all values in a single partition only 
    print("Results from glom collect : ",num.glom().collect()) # glom will distribute values in different partitons 
    print("------------------------------------------------------")
    # Formulae for addition :  sum+n*fold 
    print("Results for addition")
    print("Reduce results : ",num.reduce(lambda a,b: a+b)) 
    print("Fold = 0 ",num.fold(0,lambda a,b:a+b))  
    print("Fold = 1 ",num.fold(1,lambda a,b:a+b))  
    print("Fold = 2 ",num.fold(2,lambda a,b:a+b))  
    print("------------------------------------------------------")
    # Formulae for multiplication : multiply each partition with fold 
    print("Results for Multiplication")
    print("Reduce results : ",num.reduce(lambda a,b: a*b)) 
    print("Fold = 0 ",num.fold(0,lambda a,b: a*b))  
    print("Fold = 1 ",num.fold(1,lambda a,b: a*b))  
    print("Fold = 2 ",num.fold(2,lambda a,b: a*b))  
    print("------------------------------------------------------")



Number of partition are : 1
Results from simple collect :  [5, 5, 4, 3, 2, 9, 2]
Results from glom collect :  [[5, 5, 4, 3, 2, 9, 2]]
------------------------------------------------------
Results for addition
Reduce results :  30
Fold = 0  30
Fold = 1  32
Fold = 2  34
------------------------------------------------------
Results for Multiplication
Reduce results :  10800
Fold = 0  0
Fold = 1  10800
Fold = 2  43200
------------------------------------------------------
Number of partition are : 2
Results from simple collect :  [5, 5, 4, 3, 2, 9, 2]
Results from glom collect :  [[5, 5, 4], [3, 2, 9, 2]]
------------------------------------------------------
Results for addition
Reduce results :  30
Fold = 0  30
Fold = 1  33
Fold = 2  36
------------------------------------------------------
Results for Multiplication
Reduce results :  10800
Fold = 0  0
Fold = 1  10800
Fold = 2  86400
------------------------------------------------------
Number of partition are : 3
Results from simple 

## Narrow and Wide Transformations

1. Narrow =  map,flatmap,filter,union,sample
2. Wide = groupby, intersection, subtract , Distinct

In [3]:
num = sc.parallelize([5,5,4,3,2,9,2])
num.collect()

[5, 5, 4, 3, 2, 9, 2]

## Map

In [4]:
num.map(lambda a : a*2).collect()

[10, 10, 8, 6, 4, 18, 4]

## FlatMap

In [5]:
rdd = sc.parallelize([ 2, 3,4])
rdd.collect()

[2, 3, 4]

In [6]:
(rdd.flatMap(lambda x: range(1, x)).collect())

[1, 1, 2, 1, 2, 3]

## Filter

In [7]:
num.filter(lambda x : x%2 == 0).collect()

[4, 2, 2]

## Union

In [8]:
num2 = sc.parallelize([1,7,9,4,10,15])
num2.collect()
num2.union(num).collect()

[1, 7, 9, 4, 10, 15, 5, 5, 4, 3, 2, 9, 2]

## Sample

In [9]:
parallel = sc.parallelize(range(1,10))
parallel.collect()
parallel.sample(True,.2,).collect()

[6, 8, 9]

In [10]:
parallel.sample(False,.2).collect()

[1, 9]

## GroupBy

In [14]:
names = sc.parallelize([ "Bills", "Mark","Brain","Mick"])
names_gr = names.groupBy(lambda x : x[0]).collect()
names_gr

[('B', <pyspark.resultiterable.ResultIterable at 0x7faa06b71610>),
 ('M', <pyspark.resultiterable.ResultIterable at 0x7faa06b71450>)]

In [15]:
for (k,v) in names_gr:
    print(k,list(v) )

B ['Bills', 'Brain']
M ['Mark', 'Mick']


In [16]:
aa = sc.parallelize([1, 1, 2, 3, 5, 8])
result = aa.groupBy(lambda x: x % 2).collect()
result

[(1, <pyspark.resultiterable.ResultIterable at 0x7faa1a04dd10>),
 (0, <pyspark.resultiterable.ResultIterable at 0x7faa1a04dcd0>)]

In [17]:
for (k,v) in result:
    print(k,list(v))

1 [1, 1, 3, 5]
0 [2, 8]


## Intersection

In [20]:
print(num.collect())
print(num2.collect())
num.intersection(num2).collect()

[5, 5, 4, 3, 2, 9, 2]
[1, 7, 9, 4, 10, 15]


[4, 9]

## Subtract

In [21]:
num.subtract(num2).collect()

[2, 2, 5, 5, 3]

In [22]:
num2.subtract(num).collect()

[10, 1, 7, 15]

## Distinct

In [23]:
num.collect()
num.distinct().collect()

[5, 4, 3, 2, 9]

## Key Value RDD 

In [24]:
data = sc.parallelize([(1,2),(3,4),(3,6),(3,4)])
data.collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [26]:
type(data)

pyspark.rdd.RDD

In [27]:
data.countByValue()

defaultdict(int, {(1, 2): 1, (3, 4): 2, (3, 6): 1})

In [28]:
dataStr = sc.parallelize([(1,'mike'),(2,'john'),(3,'rambo'),(4,'bill')])
dataStr.collect()

[(1, 'mike'), (2, 'john'), (3, 'rambo'), (4, 'bill')]

In [29]:
dataStr.countByValue()

defaultdict(int,
            {(1, 'mike'): 1, (2, 'john'): 1, (3, 'rambo'): 1, (4, 'bill'): 1})

In [30]:
data.sortByKey().collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [31]:
data.mapValues(lambda a : a*a).collect()

[(1, 4), (3, 16), (3, 36), (3, 16)]

## Reduce By Keys

Reducing the values with common keys into single value

In [33]:
print(data.collect())
data.reduceByKey(lambda x, y : x+y).collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]


[(1, 2), (3, 14)]

In [34]:
data.reduceByKey(max).collect()

[(1, 2), (3, 6)]

## Group By key 

In [35]:
# groupBy: This transformation groups all the rows with the same key into a single row.
result = data.groupByKey().collect()
result

[(1, <pyspark.resultiterable.ResultIterable at 0x7faa06b7ca90>),
 (3, <pyspark.resultiterable.ResultIterable at 0x7faa06b7c9d0>)]

In [36]:
for (k,v) in result:
    print(k, list(v))

1 [2]
3 [4, 6, 4]


## Flat Map Values

In [37]:
data.flatMapValues(lambda x: range(1, x)).collect()

[(1, 1),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (3, 5),
 (3, 1),
 (3, 2),
 (3, 3)]

## Join 

In [38]:
data.collect()

[(1, 2), (3, 4), (3, 6), (3, 4)]

In [39]:
data2 = sc.parallelize([(3,9),(4,15)])
data2.collect()

[(3, 9), (4, 15)]

In [40]:
data.join(data2).collect()

[(3, (4, 9)), (3, (6, 9)), (3, (4, 9))]