# Counting the number of lines with 'a' and 'b' using DataFrames

If you want to execute it in batch, some extra housekeeping would need to be done:

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import sys

conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

When using interpreter, you can ignore it.

In [1]:
inputData = spark.read.text('spark/mary.txt').cache()

In [2]:
inputData

DataFrame[value: string]

In [3]:
inputData.columns

['value']

In [4]:
numAs = inputData.filter(inputData.value.contains('a')).count()
numBs = inputData.filter(inputData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

Lines with a: 4, lines with b: 2


In [5]:
inputData.toPandas()

Unnamed: 0,value
0,Mary had a little lamb
1,its fleece was white as snow
2,and everywhere that Mary went
3,the lamb was sure to go


In [6]:
inputData.collect()

[Row(value='Mary had a little lamb'),
 Row(value='its fleece was white as snow'),
 Row(value='and everywhere that Mary went'),
 Row(value='the lamb was sure to go')]

In [7]:
a = inputData.rdd

In [8]:
a

MapPartitionsRDD[25] at javaToPython at NativeMethodAccessorImpl.java:0

# Counting the number of word occurances

In [10]:
from pyspark.sql.functions import *

In [11]:
records = inputData.select(explode(split(inputData.value,"\s+")).name("words")).groupBy("words").count()

In [12]:
records.toPandas()

Unnamed: 0,words,count
0,Mary,2
1,everywhere,1
2,sure,1
3,fleece,1
4,was,2
5,had,1
6,white,1
7,go,1
8,its,1
9,the,1


# Getting help

In [13]:
help(explode)

Help on function explode in module pyspark.sql.functions:

explode(col)
    Returns a new row for each element in the given array or map.
    
    >>> from pyspark.sql import Row
    >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
    [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
    
    >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
    +---+-----+
    |key|value|
    +---+-----+
    |  a|    b|
    +---+-----+
    
    .. versionadded:: 1.4



In [14]:
help(inputData.select)

Help on method select in module pyspark.sql.dataframe:

select(*cols) method of pyspark.sql.dataframe.DataFrame instance
    Projects a set of expressions and returns a new :class:`DataFrame`.
    
    :param cols: list of column names (string) or expressions (:class:`Column`).
        If one of the column names is '*', that column is expanded to include all columns
        in the current DataFrame.
    
    >>> df.select('*').collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.select('name', 'age').collect()
    [Row(name='Alice', age=2), Row(name='Bob', age=5)]
    >>> df.select(df.name, (df.age + 10).alias('age')).collect()
    [Row(name='Alice', age=12), Row(name='Bob', age=15)]
    
    .. versionadded:: 1.3



# Converting between RDD and DataFrame

In [15]:
from pyspark.sql import Row

In [16]:
lines = sc.textFile("spark/sql/people.txt")

In [17]:
lines.collect()

['Michael, 29', 'Andy, 30', 'Justin, 19']

In [18]:
parts = lines.map(lambda l: l.split(","))

In [19]:
parts.collect()

[['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]

In [20]:
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

In [21]:
people.collect()

[Row(age=29, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [22]:
schemaPeople = spark.createDataFrame(people)

In [23]:
schemaPeople

DataFrame[age: bigint, name: string]

In [24]:
schemaPeople.toPandas()

Unnamed: 0,age,name
0,29,Michael
1,30,Andy
2,19,Justin


# Using SQL with Spark data frames

In [25]:
schemaPeople.createOrReplaceTempView("people")

In [26]:
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [27]:
teenagers

DataFrame[name: string]

In [28]:
teenagers.toPandas()

Unnamed: 0,name
0,Justin


In [29]:
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()

In [30]:
teenNames

['Name: Justin']

# Reading json file

In [31]:
df = spark.read.json("spark/sql/people.json")

In [32]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [33]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [34]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [35]:
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [36]:
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [37]:
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



# Creating data frame out of lists

In [38]:
df = spark.createDataFrame([["one", 1, -5.3], ["two", 3, 3.9]], schema=["a","b","c"])

In [39]:
df.show()

+---+---+----+
|  a|  b|   c|
+---+---+----+
|one|  1|-5.3|
|two|  3| 3.9|
+---+---+----+



# Derived column

In [40]:
df = df.withColumn('d', df.b + 10)

In [41]:
df.show()

+---+---+----+---+
|  a|  b|   c|  d|
+---+---+----+---+
|one|  1|-5.3| 11|
|two|  3| 3.9| 13|
+---+---+----+---+



In [42]:
df.printSchema()

root
 |-- a: string (nullable = true)
 |-- b: long (nullable = true)
 |-- c: double (nullable = true)
 |-- d: long (nullable = true)



In [43]:
df.describe().show()

+-------+----+------------------+-----------------+------------------+
|summary|   a|                 b|                c|                 d|
+-------+----+------------------+-----------------+------------------+
|  count|   2|                 2|                2|                 2|
|   mean|null|               2.0|             -0.7|              12.0|
| stddev|null|1.4142135623730951|6.505382386916237|1.4142135623730951|
|    min| one|                 1|             -5.3|                11|
|    max| two|                 3|              3.9|                13|
+-------+----+------------------+-----------------+------------------+



# Writing data frame to a file

In [45]:
!hdfs dfs -rm -r myframe.csv

2019-12-30 22:07:58,861 INFO  [main] fs.TrashPolicyDefault (TrashPolicyDefault.java:moveToTrash(182)) - Moved: 'hdfs://nameservice1/user/ivy2/myframe.csv' to trash at: hdfs://nameservice1/user/ivy2/.Trash/Current/user/ivy2/myframe.csv


In [46]:
df.write.csv("myframe.csv")

In [47]:
!hdfs dfs -ls myframe.csv

Found 3 items
-rw-r--r--   3 ivy2 ivy2          0 2019-12-30 22:08 myframe.csv/_SUCCESS
-rw-r--r--   3 ivy2 ivy2         14 2019-12-30 22:08 myframe.csv/part-00000-403fdcf5-c842-46cd-97cb-85feda60480d-c000.csv
-rw-r--r--   3 ivy2 ivy2         13 2019-12-30 22:08 myframe.csv/part-00001-403fdcf5-c842-46cd-97cb-85feda60480d-c000.csv


In [48]:
!hdfs dfs -cat myframe.csv/part*

one,1,-5.3,11
two,3,3.9,13


# Casting column

In [49]:
from pyspark.sql.types import IntegerType
df = df.withColumn("b", df["b"].cast(IntegerType()))

In [50]:
df.printSchema()

root
 |-- a: string (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: double (nullable = true)
 |-- d: long (nullable = true)



# Union of frames

In [51]:
df1 = spark.createDataFrame([["three", 111, -55.39, 17], ["four", 34, 31.9, 25]], schema=["a","b","c","d"])

In [52]:
df2 = df.union(df1)

In [53]:
df2.toPandas()

Unnamed: 0,a,b,c,d
0,one,1,-5.3,11
1,two,3,3.9,13
2,three,111,-55.39,17
3,four,34,31.9,25


# Joining frames

In [54]:
df3 = spark.createDataFrame([["one", 11], ["four", 534]], schema=["x","y"])

In [55]:
df4 = df3.join(df2, df3.x == df2.a)

In [56]:
df4.toPandas()

Unnamed: 0,x,y,a,b,c,d
0,four,534,four,34,31.9,25
1,one,11,one,1,-5.3,11


# Projection

In [57]:
df5 = df4.select(["x","c"])

In [58]:
df5.toPandas()

Unnamed: 0,x,c
0,four,31.9
1,one,-5.3


# Aggregation

In [59]:
df6 = spark.createDataFrame([["one", 1], ["two",2], ["one", 3], ["two", 5],["three", 39]], schema=["a","b"])

In [60]:
df6.toPandas()

Unnamed: 0,a,b
0,one,1
1,two,2
2,one,3
3,two,5
4,three,39


In [61]:
df6.groupby("a").count().toPandas()

Unnamed: 0,a,count
0,two,2
1,one,2
2,three,1


In [62]:
df6.groupBy("a").agg(mean("b")).toPandas()

Unnamed: 0,a,avg(b)
0,two,3.5
1,one,2.0
2,three,39.0


In [63]:
df6.groupBy("a").agg(sum("b")).toPandas()

Unnamed: 0,a,sum(b)
0,two,7
1,one,4
2,three,39


In [64]:
df6.groupBy("a").agg(max("b")).toPandas()

Unnamed: 0,a,max(b)
0,two,5
1,one,3
2,three,39


In [65]:
help(df6.groupby)

Help on method groupBy in module pyspark.sql.dataframe:

groupBy(*cols) method of pyspark.sql.dataframe.DataFrame instance
    :func:`groupby` is an alias for :func:`groupBy`.
    
    .. versionadded:: 1.4



In [None]:
help(df6.groupBy)