# First contact with df

#### Batch data

Data can be loaded in through a CSV, JSON, XML, or a Parquet file. 

It can also be created using an existing RDD and through any other database, like Hive or Cassandra as well. 

It can also take in data from HDFS or the local file system.

In [1]:
import findspark
import pyspark
import time
import operator
from pyspark import SparkConf
from pyspark import SparkContext
import pandas as pd


conf = SparkConf()
conf.setMaster("local")
conf.setAppName("spark-basic")
sc = SparkContext(conf = conf)

A first example with 'fake' data:

In [2]:
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6 },
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William', 5: 'Jack'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male', 5: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0, 5: 0}}

data2 = {'Id': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35},
         'Fare': {0: 7.3, 1: 71.3, 2: 7.9, 3: 53.1, 4: 8.0},
         'Pclass': {0: 3, 1: 1, 2: 3, 3: 1, 4: 3}}

df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())


We will need a spark session to process this type of data. Caution, many examples on the Internet do not specify what "spark." is

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame(df1_pd)
df2 = spark.createDataFrame(df2_pd)
df1.show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          6|    Jack|  male|       0|
+-----------+--------+------+--------+



Summary of the properties of our object

In [4]:
df1.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: long (nullable = true)



### Operating with dataframes

If you have experience coding with the package dplyr from R, you have more than half of it covered!

**Filter**, **select**, **mutate**, **summarize** and **arrange** are the basic verbs here too, although some of them have different names...

In [5]:
df1.select(["Name","Sex"]).show()

+--------+------+
|    Name|   Sex|
+--------+------+
|    Owen|  male|
|Florence|female|
|   Laina|female|
|    Lily|female|
| William|  male|
|    Jack|  male|
+--------+------+



In [6]:
df1.filter(df1.Sex == 'female').filter(df1.Survived == 1).show() 

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+



But we can also do it SQL-style. Note that this is not a python syntax!

In [7]:
df1.filter("Sex='female'").show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+



And this is the equivalent of mutate....

In [8]:
hello = df2.withColumn('PricePerYear', df2.Fare/df2.Age)

In [9]:
hello

DataFrame[Id: bigint, Age: bigint, Fare: double, Pclass: bigint, PricePerYear: double]

Groupby is needed when we want to summarize our data... the use is identical than in lists or dictionaries

In [10]:
df2.groupby('Pclass').avg(*["Age","Fare"]).show()

+------+------------------+-----------------+
|Pclass|          avg(Age)|        avg(Fare)|
+------+------------------+-----------------+
|     1|              36.5|             62.2|
|     3|27.666666666666668|7.733333333333333|
+------+------------------+-----------------+



In [11]:
df2.groupby('Pclass').agg({'Age': 'avg', 'Fare': 'avg'}).show()

+------+------------------+-----------------+
|Pclass|          avg(Age)|        avg(Fare)|
+------+------------------+-----------------+
|     1|              36.5|             62.2|
|     3|27.666666666666668|7.733333333333333|
+------+------------------+-----------------+



If you are not familiar with the syntax *****, it refers to "unpack", so we have to provide an unpacked list.

If we want multiple functions at the same time, we need an dictionary

In [12]:
df2.groupby('Pclass').agg({'*': 'count', 'Age': 'avg', 'Fare': 'sum'}).show()

+------+--------+------------------+---------+
|Pclass|count(1)|          avg(Age)|sum(Fare)|
+------+--------+------------------+---------+
|     1|       2|              36.5|    124.4|
|     3|       3|27.666666666666668|     23.2|
+------+--------+------------------+---------+



To present the summarized data in a nicer format, we can un toDF() to rename the columns

In [13]:
  (
    df2.groupby('Pclass').agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')
    .show()
  )

+------+------+------------------+----------+
|Pclass|counts|       average_age|total_fare|
+------+------+------------------+----------+
|     1|     2|              36.5|     124.4|
|     3|     3|27.666666666666668|      23.2|
+------+------+------------------+----------+



And finally, we can **arrange** dataframes using the method sort

In [14]:
df2.sort('Age', ascending=True).show()

+---+---+----+------+
| Id|Age|Fare|Pclass|
+---+---+----+------+
|  1| 22| 7.3|     3|
|  3| 26| 7.9|     3|
|  4| 35|53.1|     1|
|  5| 35| 8.0|     3|
|  2| 38|71.3|     1|
+---+---+----+------+



Good reference: https://dzone.com/articles/pyspark-dataframe-tutorial-introduction-to-datafra 

## Basic joins and unions

In [21]:
df1.join(df2, df2.Id == df1.PassengerId).show()

+-----------+--------+------+--------+---+---+----+------+
|PassengerId|    Name|   Sex|Survived| Id|Age|Fare|Pclass|
+-----------+--------+------+--------+---+---+----+------+
|          5| William|  male|       0|  5| 35| 8.0|     3|
|          1|    Owen|  male|       0|  1| 22| 7.3|     3|
|          3|   Laina|female|       1|  3| 26| 7.9|     3|
|          2|Florence|female|       1|  2| 38|71.3|     1|
|          4|    Lily|female|       1|  4| 35|53.1|     1|
+-----------+--------+------+--------+---+---+----+------+



In [22]:
df1.show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          6|    Jack|  male|       0|
+-----------+--------+------+--------+



Is it symmetric? Where is Jack? :'(

If you have some time, explore with other types of join, such as "left", "right", "cross", "inner"

In [None]:
#### Your code here

And union doesn't do what we could expect... why would we need it?

In [23]:
df1.union(df2).show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          6|    Jack|  male|       0|
|          1|      22|   7.3|       3|
|          2|      38|  71.3|       1|
|          3|      26|   7.9|       3|
|          4|      35|  53.1|       1|
|          5|      35|   8.0|       3|
+-----------+--------+------+--------+



Some weird join procedure... what is going on here? Are you able to find why would it be useful? (and dangerous?)

In [25]:
df1.join(df2, df1.PassengerId <= df2.Id).show()

+-----------+--------+------+--------+---+---+----+------+
|PassengerId|    Name|   Sex|Survived| Id|Age|Fare|Pclass|
+-----------+--------+------+--------+---+---+----+------+
|          1|    Owen|  male|       0|  1| 22| 7.3|     3|
|          1|    Owen|  male|       0|  2| 38|71.3|     1|
|          1|    Owen|  male|       0|  3| 26| 7.9|     3|
|          1|    Owen|  male|       0|  4| 35|53.1|     1|
|          1|    Owen|  male|       0|  5| 35| 8.0|     3|
|          2|Florence|female|       1|  2| 38|71.3|     1|
|          2|Florence|female|       1|  3| 26| 7.9|     3|
|          2|Florence|female|       1|  4| 35|53.1|     1|
|          2|Florence|female|       1|  5| 35| 8.0|     3|
|          3|   Laina|female|       1|  3| 26| 7.9|     3|
|          3|   Laina|female|       1|  4| 35|53.1|     1|
|          3|   Laina|female|       1|  5| 35| 8.0|     3|
|          4|    Lily|female|       1|  4| 35|53.1|     1|
|          4|    Lily|female|       1|  5| 35| 8.0|     

And... what about doing joins with SQL?

If we want to do that, we will need temporary Views! Sounds worse than it actually is

In [27]:
df1.createOrReplaceTempView('df1_temp')
df2.createOrReplaceTempView('df2_temp')

query = '''
    select *
    from df1_temp a, df2_temp b
    where a.PassengerId = b.Id'''

spark.sql(query).show()

+-----------+--------+------+--------+---+---+----+------+
|PassengerId|    Name|   Sex|Survived| Id|Age|Fare|Pclass|
+-----------+--------+------+--------+---+---+----+------+
|          5| William|  male|       0|  5| 35| 8.0|     3|
|          1|    Owen|  male|       0|  1| 22| 7.3|     3|
|          3|   Laina|female|       1|  3| 26| 7.9|     3|
|          2|Florence|female|       1|  2| 38|71.3|     1|
|          4|    Lily|female|       1|  4| 35|53.1|     1|
+-----------+--------+------+--------+---+---+----+------+



In [28]:
from pyspark.sql.functions import round, col

resumen = df2.groupby('Pclass').avg("Age").toDF("Clase","Media")
resumen.select("*",round(col("Media"),2)).show()

+-----+------------------+---------------+
|Clase|             Media|round(Media, 2)|
+-----+------------------+---------------+
|    1|              36.5|           36.5|
|    3|27.666666666666668|          27.67|
+-----+------------------+---------------+



## Importing and exporting data

#### Indirectly

In [31]:
prod = pd.read_excel("C:/Users/joangj/OneDrive - NETMIND/BTS/2020/data/FoodMarket.xlsx",sheet_name="Products")
cust = pd.read_excel("C:/Users/joangj/OneDrive - NETMIND/BTS/2020/data/FoodMarket.xlsx",sheet_name="Customers")
purc = pd.read_excel("C:/Users/joangj/OneDrive - NETMIND/BTS/2020/data/FoodMarket.xlsx",sheet_name="Purchases")
sell = pd.read_excel("C:/Users/joangj/OneDrive - NETMIND/BTS/2020/data/FoodMarket.xlsx",sheet_name="Sellers")

In [32]:
prod = spark.createDataFrame(prod)
cust = spark.createDataFrame(cust)
purc = spark.createDataFrame(purc)
sell = spark.createDataFrame(sell)

In [33]:
purc.show()

+----+-----+-------+--------+------+
|Code|Buyer|Product|Quantity|Seller|
+----+-----+-------+--------+------+
|   1|  139|     14|       7|    33|
|   2|  127|     51|       3|    17|
|   3|  137|     10|      15|    22|
|   4|   58|     54|       3|    30|
|   5|  196|     57|       8|    11|
|   6|  135|     19|       6|     3|
|   7|   28|     59|      19|    10|
|   8|  193|     60|      18|    18|
|   9|   68|      5|       3|    33|
|  10|   30|     51|      12|     3|
|  11|   80|      8|       9|    29|
|  12|   90|     12|       3|    12|
|  13|   72|     30|       7|    38|
|  14|  174|     53|      11|    15|
|  15|  167|     35|      11|    45|
|  16|  125|     13|      16|    21|
|  17|  120|     24|       1|    20|
|  18|  141|     28|      16|    13|
|  19|  198|     35|      11|    49|
|  20|   42|     32|       6|    52|
+----+-----+-------+--------+------+
only showing top 20 rows



#### Directly

In [34]:
spark.read.csv("C:/Users/joangj/OneDrive - NETMIND/BTS/2020/data/FoodMarket.csv",sep=";",header = True).show()

+----+--------------------+-----+
|Code|                Name|Price|
+----+--------------------+-----+
|   1|    American cheeses| 0,62|
|   2|Appellation d'Ori...|15,58|
|   3|     Apple cultivars| 15,8|
|   4|        Bacon dishes| 15,6|
|   5|   Bacon substitutes| 5,22|
|   6|     Basil cultivars| 2,21|
|   7|              Breads|14,87|
|   8| Breakfast beverages| 2,17|
|   9|   Breakfast cereals| 2,27|
|  10|     Breakfast foods| 5,51|
|  11|     British cheeses|14,68|
|  12|               Cakes|16,66|
|  13|             Candies|11,25|
|  14|             Cheeses| 4,43|
|  15|        Cheese soups| 7,21|
|  16|Christmas dishes ...| 3,06|
|  17|           Cocktails| 8,55|
|  18|             Cookies| 3,16|
|  19|Dishes using coco...|16,58|
|  20|               Diets|17,72|
+----+--------------------+-----+
only showing top 20 rows



How would you solve the problem of the decimal format?

#### Writing the file back again

An easy way, but generates a new folder and extra files...

In [42]:
prod.write.csv("Newdata",header = True) 

AnalysisException: path file:/C:/Users/joangj/OneDrive - NETMIND/BTS/2020/RealTime/Newdata already exists.;

Using pandas...

In [43]:
prod.toPandas().to_csv('mycsv.csv')

Let's see how many partitions do we have in our data!

In [44]:
purc.rdd.getNumPartitions() 

1

In [39]:
new_df = purc.groupBy("Seller").count()

In [45]:
new_df.show()

+------+-----+
|Seller|count|
+------+-----+
|    29|   32|
|    26|   25|
|    19|   28|
|    54|   29|
|    22|   37|
|     7|   34|
|    34|   23|
|    50|   29|
|    57|   18|
|    32|   18|
|    43|   28|
|    31|   26|
|    39|   39|
|    25|   33|
|     6|   17|
|    58|   20|
|     9|   21|
|    27|   20|
|    56|   25|
|    51|   18|
+------+-----+
only showing top 20 rows



In [41]:
new_df.rdd.getNumPartitions() 

200

As you can see the partition number suddenly increases. This is due to the fact that the Spark SQL module contains the following default configuration: spark.sql.shuffle.partitions set to 200.

Depending on your use case, this can be benefitial or harmfull. In this particular case, one can see that the data volume is not enough to fill all the partitions when there are 200 of them, which causes unnecessary map reduce operations, and ultimately causes the creation of very small files in HDFS, which is not desirable.

If we want to change this parameter (and many others) we can do it through the SQLContext...

In [46]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "10")

#### Testing performance, does the order affect?

It is a good moment to check if the order of the operations affect the performance, or not!

We generate a test dataframe:

In [71]:
test = purc
for i in range(0,20):
    test = test.union(purc)

test.count()

31500

In [74]:
start_time = time.time()

print(test.filter(test.Seller >= 5).filter(test.Seller == 5.0).count())

print("--- %s seconds ---" % (time.time() - start_time))

609
--- 12.142670392990112 seconds ---


In [75]:
start_time = time.time()

print(test.filter(test.Seller == 5.0).filter(test.Seller >= 5).count())

print("--- %s seconds ---" % (time.time() - start_time))

609
--- 11.958776712417603 seconds ---


### Shared variables... why and how?

For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.

#### Broadcast
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. 

![Broadcast.PNG](attachment:Broadcast.PNG)

Why would we need it?! Find a useful case

#### Accumulator
Accumulator variables are used for aggregating the information through associative and commutative operations. For example, you can use an accumulator for a sum operation or counters (in MapReduce). 

![Accumulator.PNG](attachment:Accumulator.PNG)

In [None]:
### Explore how foreach works. Find a rellevant example.

## Working a bit more with SQL (or not...) - OPEN EXERCISE

### Refresh a bit the main words!

**SELECT**

**FROM**


**WHERE**

**GROUP BY**

**HAVING**

**AND** / **OR**

**SELECT DISTINCT**

**COUNT**

**ORDER BY**

**...**

And another way to start thinking in a SQL-style...

In [81]:
purc.select('Seller').distinct().groupBy().count().show() # Number of different sellers in the Purchases table

+-----+
|count|
+-----+
|   58|
+-----+



Why do we need the groupby statement?

And the classic SQL way to do it....

In [80]:
purc.createOrReplaceTempView('purc_temp')

spark.sql('''select count (distinct Seller) from purc_temp''').show()

+----------------------+
|count(DISTINCT Seller)|
+----------------------+
|                    58|
+----------------------+



Try some of the SQL you already know, how it integrates with pyspark!

In [None]:
### YOUR CODE HERE

## SQL, and a bit more...

One last example before we finish! We will follow this example: https://opensource.com/article/19/3/apache-spark-and-dataframes-tutorial

Loading and preparing the data:

In [82]:
from pyspark.sql import Row
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext

data_file = "C:/Users/joangj/OneDrive - NETMIND/BTS - PySpark/2020/data/kddcup.data_10_percent_corrected"
raw_rdd = sc.textFile(data_file).cache()

csv_rdd = raw_rdd.map(lambda row: row.split(","))

parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]),
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)

df = sqlContext.createDataFrame(parsed_rdd)

df.createOrReplaceTempView("connections")


Translate this first query:

In [83]:
labels = sqlContext.sql("""
                           SELECT label, count(*) as freq
                           FROM connections
                           GROUP BY label
                           ORDER BY 2 DESC
                           """)
labels.show()

+----------------+------+
|           label|  freq|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
+----------------+------+
only showing top 20 rows



Translate this second query:

In [None]:
attack_protocol = sqlContext.sql("""
                           SELECT
                             protocol_type,
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as freq
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_protocol.show()

Translate this third query:

In [84]:
tcp_attack_stats = sqlContext.sql("""
                                   SELECT
                                     t.service,
                                     t.attack_type,
                                     t.total_freq
                                   FROM
                                   (SELECT
                                     service,
                                     label as attack_type,
                                     COUNT(*) as total_freq,
                                     ROUND(AVG(duration), 2) as mean_duration,
                                     SUM(num_failed_logins) as total_failed_logins,
                                     SUM(num_file_creations) as total_file_creations,
                                     SUM(su_attempted) as total_root_attempts,
                                     SUM(num_root) as total_root_acceses
                                   FROM connections
                                   WHERE protocol_type = 'tcp'
                                   AND label != 'normal.'
                                   GROUP BY service, attack_type
                                   ORDER BY total_freq DESC) as t
                                     WHERE t.mean_duration > 0
                                   """)

tcp_attack_stats.show()

+--------+----------------+----------+
| service|     attack_type|total_freq|
+--------+----------------+----------+
|    http|           back.|      2203|
| private|      portsweep.|       725|
|ftp_data|    warezclient.|       708|
|     ftp|    warezclient.|       307|
|   other|      portsweep.|       260|
| private|          satan.|       170|
|  telnet|   guess_passwd.|        53|
|  telnet|buffer_overflow.|        21|
|ftp_data|    warezmaster.|        18|
|   imap4|           imap.|        12|
|  telnet|        rootkit.|         5|
|  telnet|     loadmodule.|         5|
|   other|    warezclient.|         5|
|  supdup|      portsweep.|         4|
|    http|            phf.|         4|
|  telnet|           perl.|         3|
|   pop_3|      portsweep.|         3|
|    http|        ipsweep.|         3|
|csnet_ns|      portsweep.|         3|
|  gopher|        ipsweep.|         3|
+--------+----------------+----------+
only showing top 20 rows



And now... a pivot table:

In [85]:
tcp_attack_stats.groupby('service').pivot('attack_type').agg({'total_freq':'max'}).na.fill(0).show()

+---------+-----+----------------+----------+-------------+-----+--------+-----------+---------+-----+----+----------+--------+------+----+------------+------------+
|  service|back.|buffer_overflow.|ftp_write.|guess_passwd.|imap.|ipsweep.|loadmodule.|multihop.|perl.|phf.|portsweep.|rootkit.|satan.|spy.|warezclient.|warezmaster.|
+---------+-----+----------------+----------+-------------+-----+--------+-----------+---------+-----+----+----------+--------+------+----+------------+------------+
|   telnet|    0|              21|         0|           53|    0|       1|          5|        2|    3|   0|         0|       5|     1|   2|           0|           0|
|      ftp|    0|               1|         2|            0|    0|       1|          1|        2|    0|   0|         0|       1|     1|   0|         307|           2|
|    pop_3|    0|               0|         0|            0|    0|       0|          0|        0|    0|   0|         3|       0|     1|   0|           0|           0|
|  d

A nicer way of showing it? How to comunicate it?

In [None]:
### Your option here!

# Assignment I

We are going to use the 4 tables we imported before, to generate a full report about the purchases of these supermarkets. Some questions we might want to respond are.... (just general guidelines, the idea is that you impress me a bit).

- How do we merge all the info? How much memory are we saving if we split it in tables (don't need to use pyspark, just pandas).
- How many products have been sold?
- Which is the product that has been sold to more people?
- Which seller is making more money?
- Generate a report for each of the products, top customers, top sellers, revenue...
- ...

You can use all the tools you have available, not only pyspark!

In [12]:
#### Let's start working on it!

In [13]:
sc._jsc.sc().uiWebUrl().get()

'http://MAD-SURF002.netmind.local:4040'