In [None]:
######### RDD ##############
# Count the no of employees in particular salary

In [1]:
import pyspark
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [3]:
input = sc.textFile('/user/bigcdac432511/emp.txt')

In [4]:
input

/user/bigcdac432511/emp.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
input.top(2)

["'Yogita',80000", "'Twinkle',60000"]

In [6]:
salaries = input.map(lambda x: x.split(',')[1])

In [8]:
salaries.top(2)

['80000', '60000']

In [9]:
results = salaries.countByValue()

In [10]:
results

defaultdict(int, {'50000': 1, '40000': 1, '35000': 1, '60000': 1, '80000': 1})

In [11]:
for sal, count in results.items():
    print(sal + ' '+str(count))

50000 1
40000 1
35000 1
60000 1
80000 1


In [12]:
############# Basic Dataframe code ##############

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [14]:
spark = SparkSession.builder.config('spark.some.config.option', 'some-value').getOrCreate()

In [15]:
spark

In [16]:
################ Json file is in HDFS (HUE) ##############

In [17]:
df = spark.read.json('/user/bigcdac432511/people.json') # we can read any file ...ex : csv, excel
# df = spark.read.csv(datapath, header=True)

In [18]:
df

DataFrame[age: bigint, name: string]

In [19]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [20]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [21]:
df.head(3)

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [22]:
df.select('name').show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [28]:
df.select(df['name'], df['age'] +1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [33]:
df2 = df.select(df['name'], df['age'] +1)

In [34]:
df2

DataFrame[name: string, (age + 1): bigint]

In [35]:
df2.show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [36]:
df.filter(df['age'] > 18).show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [37]:
df.groupBy('age').count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [38]:
############### Register the Dataframe as a SQL temporary view ##############

In [39]:
df.createOrReplaceTempView('people')

In [40]:
sqlDF = spark.sql('select * from people')

In [41]:
sqlDF

DataFrame[age: bigint, name: string]

In [42]:
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [43]:
# Spark context : RDD

In [44]:
sc = spark.sparkContext

In [45]:
lines = sc.textFile('/user/bigcdac432511/emp.txt')

In [46]:
lines.take(2)

["'Tushar',50000", "'Suraj',40000"]

In [47]:
parts = lines.map(lambda l: l.split(','))

In [48]:
parts.top(2)

[["'Yogita'", '80000'], ["'Twinkle'", '60000']]

In [49]:
############### Programmatic Schema Example ###################

In [50]:
sc = spark.sparkContext

In [51]:
# load a text file and convert each line to a row
lines = sc.textFile('/user/bigcdac432511/people.txt')
parts = lines.map(lambda l: l.split(','))
people = parts.map(lambda p: (p[0], p[1].strip()))   # strip-> removes all blank spaces (string)

In [52]:
people.take(2)

[('Michael', '29'), ('Andy', '30')]

In [53]:
people.top(2)

[('Michael', '29'), ('Justin', '19')]

In [54]:
people

PythonRDD[67] at RDD at PythonRDD.scala:53

In [55]:
############## Need to define schema by providing datatype of all columns ############

In [65]:
schema = StructType([
    StructField('name', StringType(), True),
    StructField('age',StringType(), True)
])

In [66]:
schema

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

In [67]:
# Apply the schema to the RDD
schemaPeople = spark.createDataFrame(people, schema)


In [68]:
schemaPeople.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [69]:
# Create a temporary view using the Dataframe
schemaPeople.createOrReplaceTempView('people')

In [71]:
# SQL can be run over Dataframe that have been registered as a table
results = spark.sql('select name from people')

In [72]:
results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [None]:
spark.stop()