
<h1><center>Pyspark</center></h1>

In [None]:
# Basic Imports # 

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from operator import add
import random, warnings, sys, time
from __future__ import print_function
warnings.filterwarnings("ignore")
from wordcloud import WordCloud, STOPWORDS
from datetime import datetime, tzinfo, timedelta

In [None]:
sc

In [None]:
rdd = sc.textFile("/home/kaushikamaravadi/workspace/yotabites/kaushik/spark_meetup/data/meetup.csv")

<br>
### Another way to create an rdd


In [None]:
rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])


### Sum the squares of the integers from 1 to 10.

In [None]:
rdd1.map(lambda x: x**2).sum()

<br>
### Display all the elements in the rdd using collect()
Not a good choice if you have huge dataset

In [None]:
rdd1.collect()

### `take(n)`
Instead use take which gives you n elements

In [None]:
rdd.take(1)

### Counting the number of elements in the file

In [None]:
rdd.count()

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. 
We just performed atwo action on the **`rdd`** i.e **count** and **collect**

## Transformations 

### `map`

In [None]:
map_rdd = rdd.map(lambda l:l.split(",")).map(lambda l:(l,l[19]))
map_rdd.take(1)

### `flatMap`

In [None]:
flatmap_rdd = rdd.flatMap(lambda l:l.split(","))
flatmap_rdd.take(15)

### `filter`

In [None]:
filter_rdd = flatmap_rdd.filter(lambda l: l == "CA")
filter_rdd.count()

### `reduceBykey`

In [None]:
from operator import add
add_func =flatmap_rdd.map(lambda word: (word,1)).reduceByKey(add)
add_func.take(2)

### WordCount

In [None]:
counts = flatmap_rdd.flatMap(lambda x:x.split(" "))\
        .map(lambda x: (x,1))\
        .reduceByKey(lambda x,y: x+y)

In [None]:
counts.take(5)

### `sortByKey( ascending=True|False )`

In [None]:
counts.sortByKey(ascending=False).take(5)

In [None]:
counts.sortBy(lambda value: value[1], ascending=False).take(5)

<br>
### Pi Calculation

In [None]:
%%time
import random

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, 10000000)) \
             .filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / 10000000))

# Actions

### `countByKey`

In [None]:
rdd.map(lambda line: (line,1)).countByKey()

### `first()`

In [None]:
counts.first()

### `takeOrdered(n, [ordering])`

In [None]:
counts.takeOrdered(5)

### `top(n)`

In [None]:
counts.top(5)

### `saveAsTextFile(path)`

In [None]:
counts.saveAsTextFile("/home/kaushikamaravadi/Downloads/bdkcSampleTextOutput")

### `saveAsSequenceFile(path)`

In [None]:
counts.saveAsSequenceFile("/home/kaushikamaravadi/Downloads/bdkcSampleSequenceOutput")

<br>
<br>

## DataFrames 

### Read the CSV File

In [None]:
df = spark.read.csv('/home/kaushikamaravadi/workspace/yotabites/kaushik/spark_meetup/data/meetup.csv',header=True,inferSchema=True)

In [None]:
df = df.withColumn('date', df['eventtime'].cast('date'))

### `printSchema`

In [None]:
df.printSchema()

<br>
### `show()` - Display the dataframe 

In [None]:
df.show()

<br>
### Most Frequent Events

In [None]:
! pip install wordcloud

word_cloud = df.filter(df.eventname != 'None').select("eventname")

word_cloud_pandas = word_cloud.toPandas()
plt.rcParams['figure.figsize']=(12.0,15.0)    
plt.rcParams['font.size']=12                 
plt.rcParams['savefig.dpi']=100              
plt.rcParams['figure.subplot.bottom']=.1 


stopwords = set(STOPWORDS)
wordcloud = WordCloud(
                          background_color='black',
                          stopwords=stopwords,
                          max_words=100,
                          random_state=4
                         ).generate(str(word_cloud_pandas['eventname']))

print(wordcloud)
fig = plt.figure(1)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.show()

<br>

### Most Responses per state

In [None]:
most_responses = df.filter(df.groupstate != 'null').groupby("groupstate").agg({"response":"count"})\
.orderBy("count(response)",ascending=False)
most_responses.show()

<br>
<br>

### Most Responses per City

In [None]:
resp_city = df.groupby(["groupcity","memberid"]).\
            count().orderBy("count",ascending=False)

resp_city.show()

<br>
<br>
<br>

### Yes vs No

In [None]:
yes_response = df.groupby(df['eventname'],df['response']).count().orderBy("count",ascending=False)
yes = yes_response.filter(yes_response.response == 'yes').count()
no_response = df.groupby(df['eventname'],df['response']).count().orderBy("count",ascending=False)
no = no_response.filter(no_response.response == 'no').count()



<br>
<br>
<br>

### Most Number of Events 

In [None]:
df.createOrReplaceTempView("satori")
event_states = spark.sql("select eventname,count(*) from satori  \
where groupstate is not null group by eventname  order by count(1) desc")
event_states.show()

<br>
<br>
<br>

### Number of memebers who attended the meetup

In [None]:

res = df.filter(df.groupstate != 'None').groupby(["groupstate","memberid"]).count().orderBy("count",ascending=False)
res.show()

### RDBMS

In [None]:
jdbc_tblemployees = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbo") \
    .option("dbtable", "tblemployees") \
    .option("user", "root") \
    .option("password", "ubuntu@kaushik") \
    .option("driver","com.mysql.jdbc.Driver") \
    .load()

In [None]:
jdbc_tblemployees.printSchema()

In [None]:
jdbc_tblpayemployeeparamdetails = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/dbo") \
    .option("dbtable", "tblpayemployeeparamdetails") \
    .option("user", "root") \
    .option("password", "ubuntu@kaushik") \
    .option("driver","com.mysql.jdbc.Driver") \
    .load()

In [None]:
jdbc_tblpayemployeeparamdetails.printSchema()

In [None]:
joined_df = jdbc_tblemployees.join(jdbc_tblpayemployeeparamdetails,\
                                   jdbc_tblemployees['EmployeeNumber'] == jdbc_tblpayemployeeparamdetails['EmployeeNumber'],"inner").\
                                   drop(jdbc_tblemployees['EmployeeNumber'])
joined_df.count()

In [None]:
outer_join = jdbc_tblpayemployeeparamdetails.join(jdbc_tblemployees, jdbc_tblemployees['EmployeeNumber'] == jdbc_tblpayemployeeparamdetails['EmployeeNumber'],'leftsemi').\
             drop(jdbc_tblemployees['Employeenumber'])
outer_join.count()

In [None]:
average_sal = joined_df.groupBy("DepartmentCode").agg({'Amount':"mean"}).orderBy("avg(Amount)",ascending=False).show()

In [None]:
average_sal = outer_join.groupby("EmployeeNumber").agg({'Amount':"mean"}).orderBy("avg(Amount)",ascending=False).show()
average_sal

In [None]:
joined_df.createOrReplaceTempView("table")

In [None]:
spark.sql("select EmployeeNumber, avg(Amount) as avg from table group by EmployeeNumber,Amount order by avg desc ").show()

In [None]:
spark.sql("select LocationType, Sex, count(*) as count from table where Sex is not null group by LocationType,Sex order by count desc").show()

In [None]:
ssc = StreamingContext(sc, 1)

In [None]:
lines = ssc.textFileStream("/home/kaushikamaravadi/test.txt")
counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda x: (x, 1))\
                  .reduceByKey(lambda a, b: a+b)
counts.pprint()
counts.saveAsTextFiles("/home/kaushikamaravadi/output.txt")
ssc.start()
ssc.awaitTermination(10)
ssc.stop()

In [None]:
ssc.stop()