# SPARK DATA FRAME DOCUMENTATION

ELİF CANSU YILDIZ   
20.12.2018

In [2]:
# import pyspark class Row from module sql
from pyspark.sql import *
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.dataframe import *
import json

spark = SparkSession\
 .builder\
 .appName("DataFrameOrnegi")\
 .getOrCreate()

## Creating a dataframe

In [2]:
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)
df1.show()

departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)
df2.show()

#in basic form
df = spark.createDataFrame([('value1_1','value2_2'),('value2_1','value2_2')], ['column1','column2'])
df = spark.createDataFrame([('value1_1','value2_2'),('value2_1','value2_2')], ('column1','column2'))
df.show()

df = spark.createDataFrame([('Spark SQL',)], ['data'])
df.show()

+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|[123456, Computer...|[[michael, armbru...|
|[789012, Mechanic...|[[matei,, no-repl...|
+--------------------+--------------------+

+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|[345678, Theater ...|[[michael, armbru...|
|[901234, Indoor R...|[[xiangrui, meng,...|
+--------------------+--------------------+

+--------+--------+
| column1| column2|
+--------+--------+
|value1_1|value2_2|
|value2_1|value2_2|
+--------+--------+

+---------+
|     data|
+---------+
|Spark SQL|
+---------+



## Printing some values and Using Union

In [3]:
print(department1)
print(employee2)
print(departmentWithEmployees1.employees[0].email)
print(departmentWithEmployees1)

departmentWithEmployees = Row("department", "employees")

dptWithEmp = departmentWithEmployees(department1, [employee1,employee2])
print(dptWithEmp)

unionDF = df1.union(df2)   #unionDF = df1.unionAll(df2)
unionDF.show()

Row(id='123456', name='Computer Science')
Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)
no-reply@berkeley.edu
Row(department=Row(id='123456', name='Computer Science'), employees=[Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)])
Row(department=Row(id='123456', name='Computer Science'), employees=[Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000), Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)])
+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|[123456, Computer...|[[michael, armbru...|
|[789012, Mechanic...|[[matei,, no-repl...|
|[345678, Theater ...|[[michael, armbru...|
|[901234, Indoor R...|[[xiangrui, meng,...|
+--------------------+-----------

## Using explode function

In [6]:
df = unionDF.select(functions.explode("employees").alias("e"))
explodeDF = df.selectExpr("e.firstName", "e.lastName", "e.email as e_mail", "e.salary")

explodeDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|              e_mail|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|    null|no-reply@waterloo...|140000|
|     null| wendell|no-reply@berkeley...|160000|
|  michael|armbrust|no-reply@berkeley...|100000|
|     null| wendell|no-reply@berkeley...|160000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|    null|no-reply@waterloo...|140000|
+---------+--------+--------------------+------+



## Using Filter and Where Function

In [7]:
filterDF = explodeDF.filter(explodeDF.firstName == "xiangrui").sort(explodeDF.lastName)
print("\nfilterDF = \n")
filterDF.show()

# Use `|` instead of `or`
filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
print("\nfilter with OR \n")
filterDF.show()
print("\nfilterDF type = ", type(filterDF))

whereDF = explodeDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
print("\nwhereDf = \n")
whereDF.show()

#Replace null values with -- using DataFrame Na function
nonNullDF = explodeDF.fillna("--")
print("\nnonNullDf = \n")
nonNullDF.show()

filterNonNullDF = explodeDF.filter(col("firstName").isNull() | col("lastName").isNull()).sort("e_mail")
print("\nfilternonNullDf = \n")
filterNonNullDF.show()
explodeDF.show()



filterDF = 

+---------+--------+--------------------+------+
|firstName|lastName|              e_mail|salary|
+---------+--------+--------------------+------+
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+


filter with OR 

+---------+--------+--------------------+------+
|firstName|lastName|              e_mail|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+


filterDF type =  <class 'pyspark.sql.dataframe.DataFrame'>

whereDf = 

+---------+--------+--------------------+------+
|firstName|lastName|              e_mail|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|  michael|

## Using GroupBy

In [10]:
groupByDf = explodeDF.groupBy("salary").max("salary")
groupByDf.show()

countDistinctDF = explodeDF.select("firstName", "lastName")\
  .groupBy("firstName", "lastName")\
  .agg(countDistinct("firstName"))
  #.select("firstName", "lastName", col("count(DISTINCT firstName)").alias("countDistinct"))
countDistinctDF.show()

countDistinctDF.printSchema()

+------+-----------+
|salary|max(salary)|
+------+-----------+
|120000|     120000|
|140000|     140000|
|100000|     100000|
|160000|     160000|
+------+-----------+

+---------+--------+-------------------------+
|firstName|lastName|count(DISTINCT firstName)|
+---------+--------+-------------------------+
|     null| wendell|                        0|
|    matei|    null|                        1|
| xiangrui|    meng|                        1|
|  michael|armbrust|                        1|
+---------+--------+-------------------------+

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- count(DISTINCT firstName): long (nullable = false)



## EXAMPLE:

In [74]:
example = Row(id="123456%", salary=1000)
example2= Row(id="456123%", salary=2000)
example3= Row(id="123456%", salary=3000)
example4= Row(id="456123%", salary=1500)

exampleSeq = [example,example2,example3,example4]
exmpDF = spark.createDataFrame(exampleSeq)
exmpDF.printSchema()
exmpDF.show()

root
 |-- id: string (nullable = true)
 |-- salary: long (nullable = true)

+-------+------+
|     id|salary|
+-------+------+
|123456%|  1000|
|456123%|  2000|
|123456%|  3000|
|456123%|  1500|
+-------+------+



In [67]:
exmpDF.withColumn("wage",exmpDF["salary"]).show()

exmpDF.withColumn("id", exmpDF["salary"].cast(LongType())).show()

exmpDF.groupBy("id").agg(max("salary").alias("max")).show()

exmpDF.groupBy("salary").max().alias("max").show()  #alias fonksiyonu bu şekilde işe yaramadı!!

#in order to change the column name of aggregate function:
exmpDF.groupBy("salary").max().select("salary", col("max(salary)").alias("max")).show()

exmpDF.groupBy("salary").max().withColumn("wage",exmpDF["salary"]).show()

+-------+------+----+
|     id|salary|wage|
+-------+------+----+
|123456%|  1000|1000|
|456123%|  2000|2000|
|123456%|  3000|3000|
|456123%|  1500|1500|
+-------+------+----+

+----+------+
|  id|salary|
+----+------+
|1000|  1000|
|2000|  2000|
|3000|  3000|
|1500|  1500|
+----+------+

+-------+----+
|     id| max|
+-------+----+
|123456%|3000|
|456123%|2000|
+-------+----+

+------+-----------+
|salary|max(salary)|
+------+-----------+
|  1000|       1000|
|  1500|       1500|
|  3000|       3000|
|  2000|       2000|
+------+-----------+

+------+----+
|salary| max|
+------+----+
|  1000|1000|
|  1500|1500|
|  3000|3000|
|  2000|2000|
+------+----+

+------+-----------+----+
|salary|max(salary)|wage|
+------+-----------+----+
|  1000|       1000|1000|
|  1500|       1500|1500|
|  3000|       3000|3000|
|  2000|       2000|2000|
+------+-----------+----+



In [66]:
exmpDF.withColumn("newColumn", expr("concat(substring(id,1,length(id)-1))").cast(DoubleType())).show()

+-------+------+---------+
|     id|salary|newColumn|
+-------+------+---------+
|123456%|  1000| 123456.0|
|456123%|  2000| 456123.0|
|123456%|  3000| 123456.0|
|456123%|  1500| 456123.0|
+-------+------+---------+



## Using length and concat and expr function

In [98]:
exmpDF.withColumn("newColumn",concat(substring(exmpDF["id"],1,6), lit("_"), substring(exmpDF["id"],7,7))).show() 

exmpDF.withColumn("length", length("id")).show() 

exmpDF2 = exmpDF.withColumn("newID", expr("concat(substring(id,1,length(id)-1))").cast(LongType()))
exmpDF2 = exmpDF2.groupBy("newID").avg("salary")
exmpDF2.show()


+-------+------+---------+
|     id|salary|newColumn|
+-------+------+---------+
|123456%|  1000| 123456_%|
|456123%|  2000| 456123_%|
|123456%|  3000| 123456_%|
|456123%|  1500| 456123_%|
+-------+------+---------+

+-------+------+------+
|     id|salary|length|
+-------+------+------+
|123456%|  1000|     7|
|456123%|  2000|     7|
|123456%|  3000|     7|
|456123%|  1500|     7|
+-------+------+------+

+------+-----------+
| newID|avg(salary)|
+------+-----------+
|123456|     2000.0|
|456123|     1750.0|
+------+-----------+



In [88]:
results = json.loads(exmpDF.toJSON().first())
for key in results:
    print (results[key])
    

123456%
1000


## EXAMPLE FROM SAMPLE KAFKA DATA:

In [98]:
datum1 = Row(container = "7ba51fd5c3ac", cpu = "0.03%", pids = 2)
datum2 = Row(container = "e8ccbc91d6dd", cpu = "9.17%", pids = 2)
datum3 = Row(container = "7ba51fd5c3ac", cpu = "15.07%", pids = 2)
datum4 = Row(container = "e8ccbc91d6dd", cpu = "3.49%", pids = 2)
datum5 = Row(container = "7ba51fd5c3ac", cpu = "4.79%", pids = 2)
datum6 = Row(container = "7ba51fd5c3ac", cpu = "15.07%", pids = 2)

dataSequence = [datum1, datum2, datum3, datum4, datum5, datum6]
df = spark.createDataFrame(dataSequence)
df.printSchema()

root
 |-- container: string (nullable = true)
 |-- cpu: string (nullable = true)
 |-- pids: long (nullable = true)



In [99]:
df.withColumn("newcpu", expr("concat(substring(cpu,1,length(cpu)-1))").cast(DoubleType())).groupBy("container").avg("newcpu").show()

query = df\
    .withColumn("newcpu", expr("concat(substring(cpu,1,length(cpu)-1))").cast(DoubleType()))\
    .groupBy("container")\
    .max('newcpu')

query.printSchema()
query.show()

+------------+-----------+
|   container|avg(newcpu)|
+------------+-----------+
|e8ccbc91d6dd|       6.33|
|7ba51fd5c3ac|       8.74|
+------------+-----------+

root
 |-- container: string (nullable = true)
 |-- max(newcpu): double (nullable = true)

+------------+-----------+
|   container|max(newcpu)|
+------------+-----------+
|e8ccbc91d6dd|       9.17|
|7ba51fd5c3ac|      15.07|
+------------+-----------+



In [101]:
query2 = query.select(col("max(newcpu)").alias("maxcpu").cast(StringType()), "container")

query2.show()
query2.printSchema()

query2 = query2.withColumn("jsonColumn", to_json(struct([query2[x] for x in query2.columns])))
query2.show()

query3 = df\
    .withColumn("newcpu", expr("concat(substring(cpu,1,length(cpu)-1))").cast(DoubleType()))\
    .agg(max("newcpu"), avg("newcpu"))\
    .select(col("avg(newcpu)").alias("avg"), col("max(newcpu)").alias("max"))
query3.show()

+------+------------+
|maxcpu|   container|
+------+------------+
|  9.17|e8ccbc91d6dd|
| 15.07|7ba51fd5c3ac|
+------+------------+

root
 |-- maxcpu: string (nullable = true)
 |-- container: string (nullable = true)

+------+------------+--------------------+
|maxcpu|   container|          jsonColumn|
+------+------------+--------------------+
|  9.17|e8ccbc91d6dd|{"maxcpu":"9.17",...|
| 15.07|7ba51fd5c3ac|{"maxcpu":"15.07"...|
+------+------------+--------------------+

+-----------------+-----+
|              avg|  max|
+-----------------+-----+
|7.936666666666667|15.07|
+-----------------+-----+



## Using join function

In [104]:
#df.withColumn("newcpu", expr("concat(substring(cpu,1,length(cpu)-1))").cast(DoubleType())).groupBy("container").avg("newcpu").show()

print("df2 :")
df2 = df.withColumn("newcpu", expr("concat(substring(cpu,1,length(cpu)-1))").cast(DoubleType()))
#df2.printSchema()
df2.show()

print("query2:")
query2 = query.select(col("max(newcpu)").alias("maxcpu").cast(DoubleType()), col("container").alias("newcontainer"))
query2 = query2.withColumn("jsonColumn", to_json(struct([query2[x] for x in query2.columns])))
#query2.printSchema()
query2.show()

print("using left_outer join:")
query3 = query2\
        .join(df2, (query2.newcontainer == df2.container) & (query2.maxcpu == df2.newcpu), 'left_outer' )\
        .dropDuplicates(['newcontainer','maxcpu'])
query3.select("maxcpu","jsonColumn","container").show()
#query3 = query2.join(df2, df2["container"] == query2["container"] & df2["newcpu"]==query2["maxcpu"], 'outer')

print("using inner join:")
query3 = query2\
        .join(df2, (query2.newcontainer == df2.container) & (query2.maxcpu == df2.newcpu), 'inner' )\
        .dropDuplicates(['newcontainer','maxcpu'])
query3.select("maxcpu","jsonColumn","container").show()


df2 :
+------------+------+----+------+
|   container|   cpu|pids|newcpu|
+------------+------+----+------+
|7ba51fd5c3ac| 0.03%|   2|  0.03|
|e8ccbc91d6dd| 9.17%|   2|  9.17|
|7ba51fd5c3ac|15.07%|   2| 15.07|
|e8ccbc91d6dd| 3.49%|   2|  3.49|
|7ba51fd5c3ac| 4.79%|   2|  4.79|
|7ba51fd5c3ac|15.07%|   2| 15.07|
+------------+------+----+------+

query2:
+------+------------+--------------------+
|maxcpu|newcontainer|          jsonColumn|
+------+------------+--------------------+
|  9.17|e8ccbc91d6dd|{"maxcpu":9.17,"n...|
| 15.07|7ba51fd5c3ac|{"maxcpu":15.07,"...|
+------+------------+--------------------+

using left_outer join:
+------+--------------------+------------+
|maxcpu|          jsonColumn|   container|
+------+--------------------+------------+
| 15.07|{"maxcpu":15.07,"...|7ba51fd5c3ac|
|  9.17|{"maxcpu":9.17,"n...|e8ccbc91d6dd|
+------+--------------------+------------+

using inner join:
+------+--------------------+------------+
|maxcpu|          jsonColumn|   container|


## Using udf (user defined function)

In [2]:
slen = udf(lambda s: len(s), IntegerType())
find_length = udf(lambda x: len(x), IntegerType())

dfExample = spark.createDataFrame([(1, "John Doe", 21, "hello")], ("id", "name", "age","word"))
dfExample.show()

@udf('integer')
# Input/output are both a single double value
def plus_one(v):
      return v+1 

@udf('string')
# Input/output are both a single double value
def store_it(v):
      return v 

dfExample.select(slen("name").alias("slenn(name)"), plus_one("age").alias("plusli")).show()
dfExample = dfExample.withColumn("v2", plus_one("age")).withColumn("v3",find_length("word"))


+---+--------+---+-----+
| id|    name|age| word|
+---+--------+---+-----+
|  1|John Doe| 21|hello|
+---+--------+---+-----+

+-----------+------+
|slenn(name)|plusli|
+-----------+------+
|          8|    22|
+-----------+------+



In [54]:
values = [(2, 1,"Alex"), (1,1,"Starle")]
columns = ["column1", "column2", "column3"]
df = spark.createDataFrame(values,columns)
df.show()

@udf('integer')
def calc_dif(x,y):
    if (x>y) and (x!=1):
        return int(x)-int(y)     #or   calc_dif = udf(calc_dif, IntegerType())
    
dfNew = df.withColumn("calc", lit(calc_dif("column1",df["column2"])))
dfNew.show()

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|      2|      1|   Alex|
|      1|      1| Starle|
+-------+-------+-------+

+-------+-------+-------+----+
|column1|column2|column3|calc|
+-------+-------+-------+----+
|      2|      1|   Alex|   1|
|      1|      1| Starle|null|
+-------+-------+-------+----+



## Using union

In [53]:
dfExample = spark.createDataFrame([(1, "John Doe", 21, "hello")], ("id", "name", "age","word"))
tmpDF = spark.createDataFrame([(1, "John Doe", 21, "helloooo", 23, 8)])
dfExample = dfExample.union(tmpDF)
dfExample.show()

firstDF = spark.range(3).toDF("myCol")
newRow = spark.createDataFrame([[20]])
appended = firstDF.union(newRow)
appended.show()

+---+--------+---+--------+---+---+
| id|    name|age|    word| v2| v3|
+---+--------+---+--------+---+---+
|  1|John Doe| 21|   hello| 22|  5|
|  1|John Doe| 21|helloooo| 23|  8|
+---+--------+---+--------+---+---+

+-----+
|myCol|
+-----+
|    0|
|    1|
|    2|
|   20|
+-----+



## Read From JSON file

In [92]:
jsonDF = spark.read.json("dosya.json")
jsonDF.printSchema()
jsonDF.where("pids == 2").select("container").show()

root
 |-- container: string (nullable = true)
 |-- cpu: string (nullable = true)
 |-- io: struct (nullable = true)
 |    |-- block: string (nullable = true)
 |    |-- network: string (nullable = true)
 |-- memory: struct (nullable = true)
 |    |-- percent: string (nullable = true)
 |    |-- raw: string (nullable = true)
 |-- pids: string (nullable = true)

+------------+
|   container|
+------------+
|7ba51fd5c3ac|
|e8ccbc91d6dd|
+------------+



## Using expr, when, selectExpr, if in expr

In [70]:
"""
def calc_dif2(x,y):
    when((x > y) & (x != 1), x - y)"""

df = spark.createDataFrame([(2, 0,"Alex"), (1,2,"Starle")],["column1", "column2", "column3"])
df.show()

dfNew = df.withColumn("calc", when(df["column1"]>df["column2"], df["column1"]-df["column2"]))
dfNew.show()

dfNew = df.withColumn("calc", expr("length(column3)"))
dfNew.show()

dfNew = df.withColumn("calc", expr("if (column2>0 , 4, if(column1 == 2, 5, 0))"))
dfNew.show()

dfNew = df.selectExpr("length(column3)", "abs(column1)")
dfNew.show()

new_column_1 = expr(
    """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
)

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|      2|      0|   Alex|
|      1|      2| Starle|
+-------+-------+-------+

+-------+-------+-------+----+
|column1|column2|column3|calc|
+-------+-------+-------+----+
|      2|      0|   Alex|   2|
|      1|      2| Starle|null|
+-------+-------+-------+----+

+-------+-------+-------+----+
|column1|column2|column3|calc|
+-------+-------+-------+----+
|      2|      0|   Alex|   4|
|      1|      2| Starle|   6|
+-------+-------+-------+----+

+-------+-------+-------+----+
|column1|column2|column3|calc|
+-------+-------+-------+----+
|      2|      0|   Alex|   5|
|      1|      2| Starle|   4|
+-------+-------+-------+----+

+---------------+------------+
|length(column3)|abs(column1)|
+---------------+------------+
|              4|           2|
|              6|           1|
+---------------+------------+



## Using concat, expr func:

In [96]:
df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
df.withColumn("newColumn", concat(df.s, df.d)).show()   #df.withColumn("newColumn2", expr("concat(s, d)")).show()
df.select(expr("concat(s, d)").alias("cnlds")).show()   #df.select(concat(df.s, df.d).alias("cnlds")).show()

+----+---+---------+
|   s|  d|newColumn|
+----+---+---------+
|abcd|123|  abcd123|
+----+---+---------+

+-------+
|  cnlds|
+-------+
|abcd123|
+-------+



## Using window

In [36]:
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql import SparkSession

spark = SparkSession\
 .builder\
 .appName("DataFrameOrnegi")\
 .getOrCreate()

name_list = [(101, 'abc', 24), (102, 'cde', 24), (103, 'efg', 22), (104, 'ghi', 21),
             (105, 'ijk', 20), (106, 'klm', 19), (107, 'mno', 18), (108, 'pqr', 18),
             (109, 'rst', 26), (110, 'tuv', 27), (111, 'pqr', 18), (112, 'rst', 28), (113, 'tuv', 29)]


name_age_df = spark.createDataFrame(name_list, ['id', 'name', 'age'])
age_w = Window.partitionBy("age")
name_age_df.show()

name_age_count_df = name_age_df.withColumn("count", F.count("id").over(age_w)).orderBy("age")
name_age_count_df.show()

name_age_count_df_avg = name_age_count_df.groupBy().avg()
name_age_count_df_avg.show()

name_age_count_df_avg = name_age_count_df.groupBy().agg({"age":"mean"})
name_age_count_df_avg.show()


+---+----+---+
| id|name|age|
+---+----+---+
|101| abc| 24|
|102| cde| 24|
|103| efg| 22|
|104| ghi| 21|
|105| ijk| 20|
|106| klm| 19|
|107| mno| 18|
|108| pqr| 18|
|109| rst| 26|
|110| tuv| 27|
|111| pqr| 18|
|112| rst| 28|
|113| tuv| 29|
+---+----+---+

+---+----+---+-----+
| id|name|age|count|
+---+----+---+-----+
|107| mno| 18|    3|
|108| pqr| 18|    3|
|111| pqr| 18|    3|
|106| klm| 19|    1|
|105| ijk| 20|    1|
|104| ghi| 21|    1|
|103| efg| 22|    1|
|102| cde| 24|    2|
|101| abc| 24|    2|
|109| rst| 26|    1|
|110| tuv| 27|    1|
|112| rst| 28|    1|
|113| tuv| 29|    1|
+---+----+---+-----+

+-------+------------------+------------------+
|avg(id)|          avg(age)|        avg(count)|
+-------+------------------+------------------+
|  107.0|22.615384615384617|1.6153846153846154|
+-------+------------------+------------------+

+------------------+
|          avg(age)|
+------------------+
|22.615384615384617|
+------------------+



## Using to_json function

In [69]:
df = spark.createDataFrame([("10001","alex","30","75000"),("10002","bob","30","80000"),("10003","deb","30","80000"),("10004","john","30","85000"),("10005","sam","30","75000")],["id","name","age","salary"])
df.show()
#df.withColumn("jsonCol", to_json(struct([ when(col(x)!="  ",df[x]).otherwise(None) for x in df.columns]))).show()
df = df.withColumn("jsonCol", to_json(struct([when(F.col(x)!="  ",df[x]).otherwise(None).alias(x) for x in df.columns])))
df.show()
df.select(df.age.cast("string"),"name","jsonCol").show(truncate=False)

+-----+----+---+------+
|   id|name|age|salary|
+-----+----+---+------+
|10001|alex| 30| 75000|
|10002| bob| 30| 80000|
|10003| deb| 30| 80000|
|10004|john| 30| 85000|
|10005| sam| 30| 75000|
+-----+----+---+------+

+-----+----+---+------+--------------------+
|   id|name|age|salary|             jsonCol|
+-----+----+---+------+--------------------+
|10001|alex| 30| 75000|{"id":"10001","na...|
|10002| bob| 30| 80000|{"id":"10002","na...|
|10003| deb| 30| 80000|{"id":"10003","na...|
|10004|john| 30| 85000|{"id":"10004","na...|
|10005| sam| 30| 75000|{"id":"10005","na...|
+-----+----+---+------+--------------------+

+---+----+--------------------------------------------------------+
|age|name|jsonCol                                                 |
+---+----+--------------------------------------------------------+
|30 |alex|{"id":"10001","name":"alex","age":"30","salary":"75000"}|
|30 |bob |{"id":"10002","name":"bob","age":"30","salary":"80000"} |
|30 |deb |{"id":"10003","name":"deb",

## Using monotically_increasing function

In [None]:
df.withColumn("idx",monotonically_increasing_id().alias('id')).show()

## Using Explode function

In [11]:
#json sample = {"rules": [{"type": "CPU","conditions": [{"name": "maxLimit","value": "70","action": "CPU has been increased by 50 percent"}, {"name": "minLimit","value": "10","action": "CPU has been decreased by 50 percent"}]}, {"type": "RAM","conditions": [{"name": "maxLimit",	"value": "12","action": "RAM has been increased by 50 percent"}, {"name": "minLimit","value": "2","action": "RAM has been decreased by 50 percent"}]}]}
df = spark.read.json('rules.json')
df.printSchema()
"""df = df.withColumn("data", explode("rules")).selectExpr("data.*")
df.show()"""

df2 = df.select(explode("rules").alias("r"))
df2 = df2.selectExpr("r.*")
df2.show(truncate=False)

df3 = df2.select(explode("conditions").alias("c")).selectExpr("c.*")
df3.show(truncate=False)

df4 = df2.withColumn("c",explode("conditions")).selectExpr("type","c.*")
df4.show(truncate=False)

result = df3.select("action").where("value>=10")
result.show(truncate=False)

tmpDF = spark.createDataFrame([("aa",100,1500,11),("bb",101,1850,18),("cc",102,1375,3)],["name","id","salary","tmp_value"])
tmpDF.show(truncate=False)

#conc = df3.select("action").where("tmpDF.value > 10")
conc = df4.join(tmpDF, ((tmpDF.tmp_value>df4.value)) & (tmpDF.salary==1500)).select("action","value","tmp_value")
conc.show(truncate=False)

#SUMMARY:
df.select(explode("rules").alias("r"))\
    .selectExpr("r.*")\
    .withColumn("c",explode("conditions"))\
    .selectExpr("type","c.*")\
    .show(truncate=False)

root
 |-- rules: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- conditions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- action: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |-- type: string (nullable = true)

+------------------------------------------------------------------------------------------------------------+----+
|conditions                                                                                                  |type|
+------------------------------------------------------------------------------------------------------------+----+
|[[CPU has been increased by 50 percent, maxLimit, 70], [CPU has been decreased by 50 percent, minLimit, 10]]|CPU |
|[[RAM has been increased by 50 percent, maxLimit, 12], [RAM has been decreased by 50 percent, minLimit, 2]] |RAM |
+---

## Using regexp_extract function

In [19]:
# json sample = [{"eventTime":"2018-12-28 13:55:53.521274","container":"99f4e1ab4b98","memory":{"raw":"255.9MiB / 256MiB","percent":"99.98%"},"cpu":"8.09%","io":{"network":"0kB / 2B","block":"1.55GB / 1.81GB"},"pids":"2"}, {"eventTime":"2018-12-28 13:55:53.021234","container":"99f4e1ab4b98","memory":{"raw":"255.8MiB / 256MiB","percent":"99.94%"},"cpu":"3.50%","io":{"network":"3.33kB / 0B","block":"1.47GB / 1.73GB"},"pids":"2"}, {"eventTime":"2018-12-28 13:55:48.521993","container":"99f4e1ab4b98","memory":{"raw":"256MiB / 256MiB","percent":"99.98%"},"cpu":"8.58%","io":{"network":"3.12kB / 0B","block":"8GB / 1.34GB"},"pids":"2"}, {"eventTime":"2018-12-25 15:55:53.521274","container":"99f4e1ab4b98","memory":{"raw":"255.9MiB / 256MiB","percent":"99.98%"},"cpu":"8.09%","io":{"network":"0kB / 4.15B","block":"1.55GB / 8GB"},"pids":"2"}, {"eventTime":"2018-12-25 15:55:53.521274","container":"99f4e1ab4b98","memory":{"raw":"255.9MiB / 256MiB","percent":"99.98%"},"cpu":"8.09%","io":{"network":"6kB / 4.15B","block":"7GB / 3GB"},"pids":"2"}]
rawdeneme = spark.read.json("exampleFile.json")
rawdeneme = rawdeneme.alias("json_value")\
        .selectExpr(
        "json_value.eventTime",
        "cast (json_value.container as string)",
        "json_value.memory",
        "cast (json_value.cpu as string)",
        "json_value.io",
        "cast (json_value.pids as integer)"
    )
rawdeneme.show()

deneme = rawdeneme\
    .withColumn("cpu", expr("substring(cpu,1,length(cpu)-1)").cast(DoubleType()))\
    .withColumn("memory_per", col("memory.percent"))\
    .withColumn("netw_i", regexp_extract("io.network", '(\d+).(\d+)(\w+)|(\d+)', 0))\
    .withColumn("netw_o", regexp_extract("io.network", '(.)(\d+).(\d+)|(.)(\d+)', 0))\
    .withColumn("block_i", regexp_extract("io.block",'(\d+).(\d+)|(/d+)', 0))\
    .withColumn("block_o", regexp_extract("io.block",'(.)(\d+).(\d+)', 0))\
    .withColumn("network",concat(col("netw_i"),lit("-"),col("netw_o")))\
    .select("container","cpu",col("memory_per").alias("mem_usg"),"netw_i","block_i","block_o","pids","eventTime","network")
deneme.show(truncate=False)

#https://stackoverflow.com/questions/46410887/pyspark-string-matching-to-create-new-column

+--------------------+------------+--------------------+-----+--------------------+----+
|           eventTime|   container|              memory|  cpu|                  io|pids|
+--------------------+------------+--------------------+-----+--------------------+----+
|2018-12-28 13:55:...|99f4e1ab4b98|[99.98%, 255.9MiB...|8.09%|[1.55GB / 1.81GB,...|   2|
|2018-12-28 13:55:...|99f4e1ab4b98|[99.94%, 255.8MiB...|3.50%|[1.47GB / 1.73GB,...|   2|
|2018-12-28 13:55:...|99f4e1ab4b98|[99.98%, 256MiB /...|8.58%|[8GB / 1.34GB, 3....|   2|
|2018-12-25 15:55:...|99f4e1ab4b98|[99.98%, 255.9MiB...|8.09%|[1.55GB / 8GB, 0k...|   2|
|2018-12-25 15:55:...|99f4e1ab4b98|[99.98%, 255.9MiB...|8.09%|[7GB / 3GB, 6kB /...|   2|
+--------------------+------------+--------------------+-----+--------------------+----+

+------------+----+-------+------+-------+-------+----+--------------------------+----------+
|container   |cpu |mem_usg|netw_i|block_i|block_o|pids|eventTime                 |network   |
+---------

## Reference: 

https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.Window

Göz atılabilir: 

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

https://stackoverflow.com/questions/38549/what-is-the-difference-between-inner-join-and-outer-join?rq=1

In [13]:
%%bash
start-all.sh

This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/ubuntu/hadoop/logs/hadoop-ubuntu-namenode-master.out
localhost: starting datanode, logging to /home/ubuntu/hadoop/logs/hadoop-ubuntu-datanode-master.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /home/ubuntu/hadoop/logs/hadoop-ubuntu-secondarynamenode-master.out
starting yarn daemons
starting resourcemanager, logging to /home/ubuntu/hadoop/logs/yarn-ubuntu-resourcemanager-master.out
localhost: starting nodemanager, logging to /home/ubuntu/hadoop/logs/yarn-ubuntu-nodemanager-master.out
