In [2]:
import findspark
findspark.init("/usr/local/spark")
import pyspark
from pyspark.sql import SparkSession

In [3]:
# sc = pyspark.SparkContext()

## Setup spark session
**Prior Spark 2.0, Spark Context was the entry point of any spark application
and used to access all spark features and needed a sparkConf 
which had all the cluster configs and parameters to create a Spark Context object. 
We could primarily create just RDDs using Spark Context 
and we had to create specific spark contexts for any other spark interactions. 
For SQL SQLContext, hive HiveContext, streaming Streaming Application. 
In a nutshell, Spark session is a combination of all these different contexts. 
Internally, Spark session creates a new SparkContext for all the operations 
and also all the above-mentioned contexts can be accessed using the SparkSession object.**

In [11]:
spark = SparkSession.builder.appName("SparkTest").master("local[4]").getOrCreate()
print(spark.version)

3.0.0-preview2


### RDD 
represents **Resilient Distributed Dataset**. An RDD in Spark is simply an immutable distributed
collection of objects sets. Each RDD is split into multiple partitions (similar pattern with smaller sets),
which may be computed on different nodes of the cluster.

## Create RDD 
 - **From csv file** 

In [32]:
df = spark.read.format("csv").load("data_csv/Adv.csv", header=True)
df.show(25)
df.printSchema()

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|       75|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|        1|  4.8|
| 10|199.8|  2.6|     21.2| 10.6|
| 11| 66.1|  5.8|     24.2|  8.6|
| 12|214.7|   24|        4| 17.4|
| 13| 23.8| 35.1|     65.9|  9.2|
| 14| 97.5|  7.6|      7.2|  9.7|
| 15|204.1| 32.9|       46|   19|
| 16|195.4| 47.7|     52.9| 22.4|
| 17| 67.8| 36.6|      114| 12.5|
| 18|281.4| 39.6|     55.8| 24.4|
| 19| 69.2| 20.5|     18.3| 11.3|
| 20|147.3| 23.9|     19.1| 14.6|
| 21|218.4| 27.7|     53.4|   18|
| 22|237.4|  5.1|     23.5| 12.5|
| 23| 13.2| 15.9|     49.6|  5.6|
| 24|228.3| 16.9|     26.2| 15.5|
| 25| 62.3| 12.6|     18.3|  9.7|
+---+-----+-----+---------+-----+
only showing t

 - **using parallelize() of sparkContext**

In [16]:
df_2 = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),(4, 5, 6, 'd e f'),
                          (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])
df_2.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



 - **using createDataFrame()**

In [19]:
Employee = spark.createDataFrame(
    [('1', 'Joe', '70000', '1'),
    ('2', 'Henry', '80000', '2'),
    ('3', 'Sam', '60000', '2'),
    ('4', 'Max', '90000', '1')],
    ['Id', 'Name', 'Sallary','DepartmentId']
)
Employee.show()

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+



### RDDs support two types of operations:
#### transformations (https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations), which create a new dataset from an existing one, 
#### and actions (https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions), which return a value to the driver program after running a computation on the dataset.**

<img src="img/t.png"/>
<img src="img/a1.png"/>
<img src="img/a2.png"/>


In [33]:
df.columns
df.dtypes

[('_c0', 'string'),
 ('TV', 'string'),
 ('Radio', 'string'),
 ('Newspaper', 'string'),
 ('Sales', 'string')]

In [34]:
drop_name = ['TV', 'Sales']
df.drop(*drop_name).columns

['_c0', 'Radio', 'Newspaper']

In [40]:
df[(df.Radio > 24) & (df.Newspaper < 5)].show(25)

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
| 43|293.6| 27.7|      1.8| 20.7|
|129|220.3|   49|      3.2| 24.7|
|133|  8.4| 27.2|      2.1|  5.7|
|140|184.9| 43.9|      1.7| 20.7|
|194|166.8|   42|      3.6| 19.6|
+---+-----+-----+---------+-----+



In [43]:
import pyspark.sql.functions as F

In [50]:
df.withColumn('tv_norm', df.TV/df
              .groupBy()
              .agg(F.sum("TV"))
              .collect()[0][0]
             ).show(10)

df.withColumn('cond', F
              .when((df.TV > 100) & (df.Radio < 40), 1)
              .when(df.Sales>10, 2)
              .otherwise(3)
             ).show(10)

df.withColumn('log_tv', F.log(df.TV)).show(10)

+---+-----+-----+---------+-----+--------------------+
|_c0|   TV|Radio|Newspaper|Sales|             tv_norm|
+---+-----+-----+---------+-----+--------------------+
|  1|230.1| 37.8|     69.2| 22.1|0.007824268493802813|
|  2| 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
|  3| 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
|  4|151.5| 41.3|     58.5| 18.5|0.005151571824472517|
|  5|180.8| 10.8|     58.4| 12.9|0.006147882414948061|
|  6|  8.7| 48.9|       75|  7.2|2.958328374449564E-4|
|  7| 57.5| 32.8|     23.5| 11.8|0.001955217029090...|
|  8|120.2| 19.6|     11.6| 13.2|0.004087253685159...|
|  9|  8.6|  2.1|        1|  4.8|2.924324600030603...|
| 10|199.8|  2.6|     21.2| 10.6| 0.00679395412890831|
+---+-----+-----+---------+-----+--------------------+
only showing top 10 rows

+---+-----+-----+---------+-----+----+
|_c0|   TV|Radio|Newspaper|Sales|cond|
+---+-----+-----+---------+-----+----+
|  1|230.1| 37.8|     69.2| 22.1|   1|
|  2| 44.5| 39.3|     45.1| 10.4|   3|
|  3| 17.