In [14]:
from pyspark.sql import SQLContext
sqlContext =  SQLContext(sc)

In [87]:
df = sqlContext.read.json("test.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [19]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [21]:
df.select(df.name).show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [55]:
df.select(df.name,(df.age + 1).alias("age")).show()

+-----+---+
| name|age|
+-----+---+
|Akash| 27|
| Aman| 31|
+-----+---+



In [86]:
df.filter(df.age > 0 ).show()

+---+----+
|age|name|
+---+----+
+---+----+



How to filter null values? 

In [98]:
df.filter( df.age.isNotNull()).show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [57]:
df.filter(df.age == NaN).show()

NameError: name 'NaN' is not defined

In [37]:
df.groupBy(df.age).count().show()

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+



### Create DF from RDDs

1

In [40]:
from pyspark.sql import Row

BaseRDD =  sc.parallelize([('Akash',26),('Aman',30)])
peopleRDD = BaseRDD.map(lambda x : Row(name=x[0],age=x[1]))

peopleRDD.collect()

[Row(age=26, name='Akash'), Row(age=30, name='Aman')]

In [41]:
df = sqlContext.createDataFrame(peopleRDD)
df.show()

+---+-----+
|age| name|
+---+-----+
| 26|Akash|
| 30| Aman|
+---+-----+



In [44]:
df2 = peopleRDD.toDF()
df2.show()

+---+-----+
|age| name|
+---+-----+
| 26|Akash|
| 30| Aman|
+---+-----+



In [45]:
df2.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [48]:
from pyspark.sql.types import *
schema = StructType([StructField('name',StringType()),StructField('age',IntegerType())])

df = sqlContext.createDataFrame(BaseRDD,schema)

df.show()

+-----+---+
| name|age|
+-----+---+
|Akash| 26|
| Aman| 30|
+-----+---+



In [49]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [54]:
df.write.parquet("test")

AnalysisException: u'path file:/C:/Users/gradea/Desktop/MyCode/test.parquet already exists.;'

In [58]:
df = sqlContext.range(1,10)
df.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [70]:
#sqlContext.registerFunction("STR_LEN",lambda x : len(x),IntegerType())

df=sqlContext.sql("select name,age from temp")
df.show()

+-----+---+
| name|age|
+-----+---+
|Akash| 26|
| Aman| 30|
+-----+---+



In [71]:
df2.registerTempTable("temp")

df=sqlContext.sql("select * from temp")
df.show()

+---+-----+
|age| name|
+---+-----+
| 26|Akash|
| 30| Aman|
+---+-----+



In [None]:
sqlContext.registerFunction('strlen',lambda x: len(x),IntegerType)

### JOIN 

In [84]:
df.select(df.name).distinct().show()

+-----+
| name|
+-----+
|Akash|
| Aman|
+-----+



In [83]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [99]:
df.explain()

== Physical Plan ==
Scan JSONRelation[age#31L,name#32] InputPaths: file:/C:/Users/gradea/Desktop/MyCode/test.json


In [103]:
df.flatMap(lambda x : x.name).collect()

[u'M',
 u'i',
 u'c',
 u'h',
 u'a',
 u'e',
 u'l',
 u'A',
 u'n',
 u'd',
 u'y',
 u'J',
 u'u',
 u's',
 u't',
 u'i',
 u'n']

In [107]:
df.agg?

In [109]:
rdd = sc.parallelize(range(1,10))


In [110]:
pyspark.sql.Window?