
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0                                                        |
| %security_configuration     |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
###
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pySparkDF").getOrCreate()
##
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.34 and you have 0.30 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::533588983801:role/FULL_GLUE
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 54aaeac2-74c5-4d33-b9f2-dae565cd43cf
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog true
Waiting for session 54aaeac2-74c5-4d33-b9f2-dae565cd43cf to get into ready s

# Spark DFs

## 64. Introduction to Spark DFs

## 65. Creating Spark DFs
1. ```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pySparkDF").getOrCreate()
``` 


2. There're many ways to read the data (parquet, csv, odbc, etc)
```python 
df = spark.read.csv(ruta)
```
If headers are as row, you can specify that we don't want that. For this we  

```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pySparkDF").getOrCreate()
```

In [7]:
df = spark.read.csv('s3://xxxxxxxx/StudentData.csv')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|_c0|   _c1|             _c2|   _c3|   _c4|  _c5|                 _c6|
+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
| 28|Female| Hubert Oliveras|    DB| 02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|F

In [1]:
df = spark.read.option("header", True).csv('s3://xxxxxxxx/StudentData.csv')




In [2]:
df.show(10)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC|52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP|61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud|72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC|81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP|92882|   51|Judie Chipps_Clem...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 10 

In [5]:
df = spark.read.option("inferSchema",True).option("header", True).csv('s3://xxxxxxxx/StudentData.csv')




In [9]:
df= spark.read.options(inferSchema = True, header = True, delimiter ='\t').csv('s3://xxxxxxxx/StudentData.csv')




In [10]:
df

DataFrame[age: int, gender: string, name: string, course: string, roll: int, marks: int, email: string]


In [4]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: integer (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)


If I don't want to infer schema, I can pass by my own with the next code:

In [12]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType




In [13]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
schema = StructType([
    StructField('age', IntegerType() ,True),
    StructField('gender',StringType(), True),
    StructField('name', StringType(), True),
    StructField('course', StringType(), True),
    StructField('roll', StringType(), True),
    StructField('marks', IntegerType(), True),
    StructField('email', StringType(), True)
])




In [20]:
df= spark.read.options(header = True).schema(schema).csv('s3://xxxxxxxx/StudentData.csv')




In [21]:
df.show(10)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC|52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP|61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud|72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC|81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP|92882|   51|Judie Chipps_Clem...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 10 

## 68. Create DF from RDD
```python
df = rdd.toDF()
```

### Creating RDD:

In [1]:
rdd = sc.textFile('s3://xxxxxxxx/StudentData.csv')




In [2]:
col = rdd.first()
rdd = rdd.filter(lambda x: x!= col)




In [3]:
rdd2 = rdd.map(lambda x: x.split(','))
rdd2 = rdd2.map(lambda x: [int(x[0]),x[1],x[2],x[3],x[4],int(x[5]),x[6]])




In [4]:
rdd2.take(10)

[[28, 'Female', 'Hubert Oliveras', 'DB', '02984', 59, 'Annika Hoffman_Naoma Fritts@OOP.com'], [29, 'Female', 'Toshiko Hillyard', 'Cloud', '12899', 62, 'Margene Moores_Marylee Capasso@DB.com'], [28, 'Male', 'Celeste Lollis', 'PF', '21267', 45, 'Jeannetta Golden_Jenna Montague@DSA.com'], [29, 'Female', 'Elenore Choy', 'DB', '32877', 29, 'Billi Clore_Mitzi Seldon@DB.com'], [28, 'Male', 'Sheryll Towler', 'DSA', '41487', 41, 'Claude Panos_Judie Chipps@OOP.com'], [28, 'Male', 'Margene Moores', 'MVC', '52771', 32, 'Toshiko Hillyard_Clementina Menke@MVC.com'], [28, 'Male', 'Neda Briski', 'OOP', '61973', 69, 'Alberta Freund_Elenore Choy@DB.com'], [28, 'Female', 'Claude Panos', 'Cloud', '72409', 85, 'Sheryll Towler_Alberta Freund@Cloud.com'], [28, 'Male', 'Celeste Lollis', 'MVC', '81492', 64, 'Nicole Harwood_Claude Panos@MVC.com'], [29, 'Male', 'Cordie Harnois', 'OOP', '92882', 51, 'Judie Chipps_Clementina Menke@MVC.com']]


In [5]:
df = rdd2.toDF()




In [6]:
df.show(10)

+---+------+----------------+-----+-----+---+--------------------+
| _1|    _2|              _3|   _4|   _5| _6|                  _7|
+---+------+----------------+-----+-----+---+--------------------+
| 28|Female| Hubert Oliveras|   DB|02984| 59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard|Cloud|12899| 62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|   PF|21267| 45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|   DB|32877| 29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|  DSA|41487| 41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|  MVC|52771| 32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|  OOP|61973| 69|Alberta Freund_El...|
| 28|Female|    Claude Panos|Cloud|72409| 85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|  MVC|81492| 64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|  OOP|92882| 51|Judie Chipps_Clem...|
+---+------+----------------+-----+-----+---+--------------------+
only showing top 10 rows


In [7]:
cols = col.split(',')
df = rdd2.toDF(cols)




In [8]:
df.show(10)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC|52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP|61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud|72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC|81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP|92882|   51|Judie Chipps_Clem...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 10 

In [9]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: long (nullable = true)
 |-- email: string (nullable = true)


In [10]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
schema = StructType([
    StructField('age', IntegerType() ,True),
    StructField('gender',StringType(), True),
    StructField('name', StringType(), True),
    StructField('course', StringType(), True),
    StructField('roll', StringType(), True),
    StructField('marks', IntegerType(), True),
    StructField('email', StringType(), True)
])




In [11]:
df2 = spark.createDataFrame(rdd2, schema=schema)




In [12]:
df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)


In [13]:
df2.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 02984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

### 70. Select DF Columns
Exist three ways to select columns: 
```python 
df.select(col("col1"),col("col2"))  
df.select(df.col1, df.col2)

import pyspark.sql.functions import col
df.select("col1","col2")
```

In [8]:
df= spark.read.options(inferSchema = True, header = True).csv('s3://xxxxxxxx/StudentData.csv')




In [9]:
df.show(10)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC|52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP|61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud|72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC|81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP|92882|   51|Judie Chipps_Clem...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 10 

In [10]:
df2 = df.select("gender","gender")
df2.show(2)

+------+------+
|gender|gender|
+------+------+
|Female|Female|
|Female|Female|
+------+------+
only showing top 2 rows


In [11]:
df3 = df.select(df.name, df.gender, df.age)
df3.show(3)

+----------------+------+---+
|            name|gender|age|
+----------------+------+---+
| Hubert Oliveras|Female| 28|
|Toshiko Hillyard|Female| 29|
|  Celeste Lollis|  Male| 28|
+----------------+------+---+
only showing top 3 rows


import pyspark.sql.functions import col
df.select("col1","col2")

In [19]:
from pyspark.sql.functions import col
df.select(col("gender"),col("age")).show(2)

+------+---+
|gender|age|
+------+---+
|Female| 28|
|Female| 29|
+------+---+
only showing top 2 rows


In [20]:
df.select("*").show(2)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 2 rows


In [22]:
df.select("*").show(2)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 2 rows


### 71. withColumn
withColumn is used for manipulating or creating new columns
```python 
from pyspark.sql.functions import col, lit
df2 = df2.withColumn("Country", lit("US"))
```

In [23]:
df= spark.read.options(inferSchema = True, header = True).csv('s3://xxxxxxxx/StudentData.csv')




In [24]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: integer (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)


In [31]:
from pyspark.sql.functions import col
df.withColumn("roll", col("roll")).show(2)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 2 rows


In [34]:
df2 = df.withColumn("roll", col("roll").cast("String"))




In [35]:
df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- roll: string (nullable = true)
 |-- marks: integer (nullable = true)
 |-- email: string (nullable = true)


In [36]:
df2 = df2.withColumn("marks2", col("marks")+10)
df2.show(2)

+---+------+----------------+------+-----+-----+--------------------+------+
|age|gender|            name|course| roll|marks|               email|marks2|
+---+------+----------------+------+-----+-----+--------------------+------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|    69|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|    72|
+---+------+----------------+------+-----+-----+--------------------+------+
only showing top 2 rows


In [37]:
df2 = df2.withColumn("marks_subs", col("marks")-col("marks2"))
df2.show(2)

+---+------+----------------+------+-----+-----+--------------------+------+----------+
|age|gender|            name|course| roll|marks|               email|marks2|marks_subs|
+---+------+----------------+------+-----+-----+--------------------+------+----------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|    69|       -10|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|    72|       -10|
+---+------+----------------+------+-----+-----+--------------------+------+----------+
only showing top 2 rows


In [38]:
df2 = df2.withColumn("Country", "US")

AssertionError: col should be Column


To fix the previous error, we can import literal:
```python
from pyspark.sql.functions import col, lit
```

In [42]:
from pyspark.sql.functions import col, lit
df2 = df2.withColumn("Country", lit("US"))
df2.show(2)

+---+------+----------------+------+-----+-----+--------------------+------+----------+-------+
|age|gender|            name|course| roll|marks|               email|marks2|marks_subs|Country|
+---+------+----------------+------+-----+-----+--------------------+------+----------+-------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|    69|       -10|     US|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|    72|       -10|     US|
+---+------+----------------+------+-----+-----+--------------------+------+----------+-------+
only showing top 2 rows


In [None]:
df.show()

In [1]:
df.withColumn('',col('')).withColumn('',col('')).show()

NameError: name 'df' is not defined


### 72. Spark DF withColumnRenamed and Alias
this function is to rename columns
```python
#
```

In [8]:
from pyspark.sql.functions import col, lit
df= spark.read.options(inferSchema = True, header = True).csv('s3://xxxxxxxx/StudentData.csv')
df.show(2)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 2 rows


In [9]:
df2 = df.withColumnRenamed("gender", "sex").withColumnRenamed("roll", "roll number") # it doesn't throw error if column doesn't exist
df2.show(2)

+---+------+----------------+------+-----------+-----+--------------------+
|age|   sex|            name|course|roll number|marks|               email|
+---+------+----------------+------+-----------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|       2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|      12899|   62|Margene Moores_Ma...|
+---+------+----------------+------+-----------+-----+--------------------+
only showing top 2 rows


In [14]:
df.select(col("name").alias("Full Name")).show(2) #create an alias without modify the df

+----------------+
|       Full Name|
+----------------+
| Hubert Oliveras|
|Toshiko Hillyard|
+----------------+
only showing top 2 rows


In [12]:
df.show(2)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 2 rows


### 73. Spark DF Filter rows
##### filter / where in DataFrame
both works in the same way

In [1]:
from pyspark.sql.functions import col, lit
df= spark.read.options(inferSchema = True, header = True).csv('s3://xxxxxxxx/StudentData.csv')
df.show(2)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 2 rows


In [16]:
df.filter(df.course == "DB").show()

+---+------+-----------------+------+-------+-----+--------------------+
|age|gender|             name|course|   roll|marks|               email|
+---+------+-----------------+------+-------+-----+--------------------+
| 28|Female|  Hubert Oliveras|    DB|   2984|   59|Annika Hoffman_Na...|
| 29|Female|     Elenore Choy|    DB|  32877|   29|Billi Clore_Mitzi...|
| 29|  Male|  Ernest Rossbach|    DB| 111449|   53|Maybell Duguay_Ab...|
| 28|Female|   Latia Vanhoose|    DB| 122502|   27|Latia Vanhoose_Mi...|
| 29|Female|   Latia Vanhoose|    DB| 152159|   27|Claude Panos_Sant...|
| 28|Female| Mickey Cortright|    DB| 192537|   62|Ernest Rossbach_M...|
| 28|Female|      Anna Santos|    DB| 311589|   79|Celeste Lollis_Mi...|
| 28|  Male|    Kizzy Brenner|    DB| 381712|   36|Paris Hutton_Kena...|
| 28|  Male| Toshiko Hillyard|    DB| 392218|   47|Leontine Phillips...|
| 29|  Male|     Paris Hutton|    DB| 481229|   57|Clementina Menke_...|
| 28|Female| Mickey Cortright|    DB| 551389|   43|

In [18]:
df.filter(col("course") == "DB").show(3)

+---+------+---------------+------+------+-----+--------------------+
|age|gender|           name|course|  roll|marks|               email|
+---+------+---------------+------+------+-----+--------------------+
| 28|Female|Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|   Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 29|  Male|Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
+---+------+---------------+------+------+-----+--------------------+
only showing top 3 rows


In [20]:
df.filter( (df.course == "DB") & (df.marks > 50) ).show(10)

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|  Male| Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
| 28|Female|Mickey Cortright|    DB|192537|   62|Ernest Rossbach_M...|
| 28|Female|     Anna Santos|    DB|311589|   79|Celeste Lollis_Mi...|
| 29|  Male|    Paris Hutton|    DB|481229|   57|Clementina Menke_...|
| 28|Female| Hubert Oliveras|    DB|771081|   79|Kizzy Brenner_Dus...|
| 29|Female|    Elenore Choy|    DB|811824|   55|Maybell Duguay_Me...|
| 29|  Male|Clementina Menke|    DB|882200|   76|Michelle Ruggiero...|
| 29|Female| Sebrina Maresca|    DB|922210|   54|Toshiko Hillyard_...|
| 29|  Male|    Naoma Fritts|    DB|931295|   79|Hubert Oliveras_S...|
+---+------+----------------+------+------+-----+--------------------+
only s

In [24]:
courses = ['DB','Cloud','OOP', 'DSA']        #### df.column.isin(.....)
df.filter( (df.course.isin(courses)) & (df.marks > 50) ).show(10)

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|  Male| Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
| 28|Female|Mickey Cortright|    DB|192537|   62|Ernest Rossbach_M...|
| 28|Female|       Kena Wild| Cloud|221750|   60|Mitzi Seldon_Jenn...|
| 28|Female|    Jc Andrepont|   DSA|232060|   58|Billi Clore_Abram...|
| 29|Female|     Anna Santos| Cloud|242254|   68|Jc Andrepont_Hube...|
+---+------+----------------+------+------+-----+--------------------+
only s

In [27]:
df.filter( df.course.startswith("D") ).show(10) ### startswith

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29|  Male| Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
| 28|Female|  Latia Vanhoose|    DB|122502|   27|Latia Vanhoose_Mi...|
| 29|Female|  Latia Vanhoose|    DB|152159|   27|Claude Panos_Sant...|
| 28|Female|Mickey Cortright|    DB|192537|   62|Ernest Rossbach_M...|
| 28|Female|    Jc Andrepont|   DSA|232060|   58|Billi Clore_Abram...|
| 29|Female|    Paris Hutton|   DSA|271472|   99|Sheryll Towler_Al...|
+---+------+----------------+------+------+-----+--------------------+
only s

In [29]:
df.filter( df.course.endswith("A") ).show(10) ### endswith

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 28|Female|    Jc Andrepont|   DSA|232060|   58|Billi Clore_Abram...|
| 29|Female|    Paris Hutton|   DSA|271472|   99|Sheryll Towler_Al...|
| 28|Female|  Dustin Feagins|   DSA|291984|   82|Abram Nagao_Kena ...|
| 28|Female|Mickey Cortright|   DSA|342003|   44|Mitzi Seldon_Jean...|
| 29|Female|     Anna Santos|   DSA|411479|   42|Kena Wild_Mitzi S...|
| 28|Female|  Maybell Duguay|   DSA|452141|   29|Leontine Phillips...|
| 29|Female|    Paris Hutton|   DSA|492159|   60|Nicole Harwood_Ma...|
| 29|  Male|  Celeste Lollis|   DSA|562065|   85|Jc Andrepont_Mela...|
+---+------+----------------+------+------+-----+--------------------+
only s

In [4]:
df.filter( df.name.contains("nnika") ).show(10) ### endswith

+---+------+--------------+------+-------+-----+--------------------+
|age|gender|          name|course|   roll|marks|               email|
+---+------+--------------+------+-------+-----+--------------------+
| 29|  Male|Annika Hoffman|   OOP| 171660|   22|Taryn Brownlee_Mi...|
| 29|  Male|Annika Hoffman|   OOP| 472550|   35|Mitzi Seldon_Abra...|
| 28|  Male|Annika Hoffman| Cloud| 722193|   55|Taryn Brownlee_El...|
| 28|  Male|Annika Hoffman|    DB|1031544|   44|Dustin Feagins_So...|
| 29|Female|Annika Hoffman|   OOP|1551846|   50|Paris Hutton_Mela...|
| 29|Female|Annika Hoffman|    PF|2192313|   49|Naoma Fritts_Tiju...|
| 29|  Male|Annika Hoffman|    PF|2301940|   81|Jenna Montague_Sh...|
| 29|Female|Annika Hoffman|    PF|2532233|   74|Somer Stoecker_An...|
| 29|  Male|Annika Hoffman| Cloud|3051363|   57|Paris Hutton_Leon...|
| 29|  Male|Annika Hoffman|   DSA|3471063|   42|Bonita Higuera_Co...|
+---+------+--------------+------+-------+-----+--------------------+
only showing top 10 

In [6]:
df.filter( df.name.like("%ff%") ).show(10) ### endswith

+---+------+--------------+------+-------+-----+--------------------+
|age|gender|          name|course|   roll|marks|               email|
+---+------+--------------+------+-------+-----+--------------------+
| 29|  Male|Annika Hoffman|   OOP| 171660|   22|Taryn Brownlee_Mi...|
| 29|  Male|Annika Hoffman|   OOP| 472550|   35|Mitzi Seldon_Abra...|
| 28|  Male|Annika Hoffman| Cloud| 722193|   55|Taryn Brownlee_El...|
| 28|  Male|Annika Hoffman|    DB|1031544|   44|Dustin Feagins_So...|
| 29|Female|Annika Hoffman|   OOP|1551846|   50|Paris Hutton_Mela...|
| 29|Female|Annika Hoffman|    PF|2192313|   49|Naoma Fritts_Tiju...|
| 29|  Male|Annika Hoffman|    PF|2301940|   81|Jenna Montague_Sh...|
| 29|Female|Annika Hoffman|    PF|2532233|   74|Somer Stoecker_An...|
| 29|  Male|Annika Hoffman| Cloud|3051363|   57|Paris Hutton_Leon...|
| 29|  Male|Annika Hoffman|   DSA|3471063|   42|Bonita Higuera_Co...|
+---+------+--------------+------+-------+-----+--------------------+
only showing top 10 

### 74. Quiz (select, withColumn, filter) 
### 75. Quiz Solution
- [check] Read the file in the DF 
- [check] Create a new column in the DF for total marks and let the total marks be 120 
- Create a new column average to calculate the average marks of the student. (mark/total marks)/100
- Filter out all those students who have achieved more than 80% marks in OOP course and save it in a new DF
- Filter out all those students who have achieved more than 60% marks in Cloud course and save it in a new DF 
- Print the names and marks of all the students from the above DFs

In [30]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/StudentData.csv')
df.show(3)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 3 rows


#### Create a new column in the DF for total marks and let the total marks be 120 

In [31]:
from pyspark.sql.functions import col, lit
df = df.withColumn("total_Marks", lit(120))




In [32]:
df.show(2)

+---+------+----------------+------+-----+-----+--------------------+-----------+
|age|gender|            name|course| roll|marks|               email|total_Marks|
+---+------+----------------+------+-----+-----+--------------------+-----------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|        120|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|        120|
+---+------+----------------+------+-----+-----+--------------------+-----------+
only showing top 2 rows


#### Create a new column average to calculate the average marks of the student. (mark/total marks)/100

In [33]:
df = df.withColumn("AVG", col("marks")/col("total_Marks")*100)
df.show(10)

+---+------+----------------+------+-----+-----+--------------------+-----------+------------------+
|age|gender|            name|course| roll|marks|               email|total_Marks|               AVG|
+---+------+----------------+------+-----+-----+--------------------+-----------+------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|        120|49.166666666666664|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|        120| 51.66666666666667|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|        120|              37.5|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|        120|24.166666666666668|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|        120|34.166666666666664|
| 28|  Male|  Margene Moores|   MVC|52771|   32|Toshiko Hillyard_...|        120|26.666666666666668|
| 28|  Male|     Neda Briski|   OOP|61973|   69|Alberta Freund_El...|        120| 57.499999

#### Filter out all those students who have achieved more than 80% marks in OOP course and save it in a new DF

In [36]:
df_marks = df.filter( (df.course == "OOP") &(df.AVG > 80))




In [37]:
df_marks.show(3)

+---+------+------------------+------+-------+-----+--------------------+-----------+-----------------+
|age|gender|              name|course|   roll|marks|               email|total_Marks|              AVG|
+---+------+------------------+------+-------+-----+--------------------+-----------+-----------------+
| 28|  Male|    Jenna Montague|   OOP|3331161|   98|Leontine Phillips...|        120|81.66666666666667|
| 29|Female|Priscila Tavernier|   OOP|3902993|   99|Celeste Lollis_Bi...|        120|             82.5|
| 28|Female|      Judie Chipps|   OOP|5451977|   99|Tamera Blakley_Mi...|        120|             82.5|
+---+------+------------------+------+-------+-----+--------------------+-----------+-----------------+
only showing top 3 rows


#### Filter out all those students who have achieved more than 60% marks in Cloud course and save it in a new DF 

In [38]:
df_marks_cloud = df.filter( (df.course == "Cloud") &(df.AVG > 60))




In [39]:
df_marks_cloud.show(3)

+---+------+--------------+------+------+-----+--------------------+-----------+-----------------+
|age|gender|          name|course|  roll|marks|               email|total_Marks|              AVG|
+---+------+--------------+------+------+-----+--------------------+-----------+-----------------+
| 28|Female|  Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|        120|70.83333333333334|
| 29|  Male|   Billi Clore| Cloud|512047|   76|Taryn Brownlee_Ju...|        120|63.33333333333333|
| 28|Female|Somer Stoecker| Cloud|612490|   82|Sebrina Maresca_G...|        120|68.33333333333333|
+---+------+--------------+------+------+-----+--------------------+-----------+-----------------+
only showing top 3 rows


#### Print the names and marks of all the students from the above DFs

In [27]:
df.select(df.name, df.marks).show()

+----------------+-----+
|            name|marks|
+----------------+-----+
| Hubert Oliveras|   59|
|Toshiko Hillyard|   62|
|  Celeste Lollis|   45|
|    Elenore Choy|   29|
|  Sheryll Towler|   41|
|  Margene Moores|   32|
|     Neda Briski|   69|
|    Claude Panos|   85|
|  Celeste Lollis|   64|
|  Cordie Harnois|   51|
|       Kena Wild|   35|
| Ernest Rossbach|   53|
|  Latia Vanhoose|   27|
|  Latia Vanhoose|   55|
|     Neda Briski|   42|
|  Latia Vanhoose|   27|
|  Loris Crossett|   36|
|  Annika Hoffman|   22|
|   Santa Kerfien|   56|
|Mickey Cortright|   62|
+----------------+-----+
only showing top 20 rows


## 76. Spark DF (Count, Distinct, Duplicate)

In [3]:
from pyspark.sql.functions import col, lit
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/StudentData.csv')
df.show(3)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 3 rows


In [4]:
df.count()

1000


In [8]:
df.filter(df.course == "DB").count()

157


### Distinct

In [13]:
df.select(df.gender, df.age).distinct().count()

4


### Duplicate - dropDuplicates([col.name])

In [17]:
df.dropDuplicates(["gender", "course"]).show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 29|  Male| Ernest Rossbach|    DB|111449|   53|Maybell Duguay_Ab...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|Female|  Alberta Freund|   OOP|251805|   83|Annika Hoffman_Sh...|
| 29|  Male|     Billi Clore| Cloud|512047|   76|Taryn Brownlee_Ju...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 29|Female|  Latia Vanhoose|   MVC|132110|   55|Eda Neathery_Nico...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|  Loris Crossett|    PF|201487|   96|Elenore Choy_Lati...|
| 29|F

## 77. Quiz (Distinct, Duplicate)
- Write a code to display all the unique rows for age, gender and course column.

In [1]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/StudentData.csv')




In [2]:
df.show(5)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB|32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA|41487|   41|Claude Panos_Judi...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 5 rows


In [10]:
df.select(df.age, df.gender, df.course).distinct().count()

24


In [9]:
df.dropDuplicates(["age","gender","course"]).select(df.age, df.gender, df.course).count()

24


### 79. orderBy() sort()

In [3]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/StudentData.csv')
df.sort(df.age.asc()).show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female|       Kena Wild| Cloud|221750|   60|Mitzi Seldon_Jenn...|
| 28|Female|   Tijuana Kropf|    PF|571047|   41|Loris Crossett_Se...|
| 28|Female|    Jc Andrepont|   DSA|232060|   58|Billi Clore_Abram...|
| 28|Female|    Cheri Kenney|   MVC|321816|   24|Kena Wild_Michell...|
| 28|Female|  Loris Crossett|    PF|332739|   62|Michelle Ruggiero...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|Female|Mickey Cortright|   DSA|342003|   44|Mitzi Seldon_Jean...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|  Male| Hubert Oliveras|   OOP|351719|   63|Lawanda Wohlwend_...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 28|  Male| Sebrina Maresca|    PF|361316|   62|Nicole Harwood_La...|
| 28|F

### 80. Quiz
- Create a DF, sorted on bonus in ascending order and show it
- Create DF, sorted on age and salary in descendign and ascending order respectively and show it
- Create a DF sorted on age, bonys and salary in descending, descending and ascending order respectively and show it

In [2]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/OfficeData.csv')




In [3]:
df.show(2)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
+-------------+----------+-----+------+---+-----+
only showing top 2 rows


In [4]:
df2 = df.sort(df.bonus.asc())
df2.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+


In [7]:
df3 = df.orderBy(df.age.desc(),df.salary.asc())
df3.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|      Michael|     Sales|   NY| 86000| 56|20000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        James|     Sales|   NY| 90000| 34|10000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Maria|   Finance|   CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+


In [8]:
df4 = df.sort(df.age.desc() , df.bonus.asc() , df.salary.desc() )
df4.show(3)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|      Michael|     Sales|   NY| 86000| 56|20000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+
only showing top 3 rows


### 82. Group By 
Group By will create groups based on given columns  
After grouping, you should performe an aggregation function
#### aggregate function
- sum()  
- count()  
- max() - min()
- avg()
- mean()

In [1]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/StudentData.csv')




In [2]:
df.groupBy("gender").sum("marks").show()

+------+----------+
|gender|sum(marks)|
+------+----------+
|Female|     29636|
|  Male|     30461|
+------+----------+


In [10]:
df.groupBy("course").count().show()

+------+-----+
|course|count|
+------+-----+
|   MVC|  157|
|   DSA|  176|
|    DB|  157|
|    PF|  166|
| Cloud|  192|
|   OOP|  152|
+------+-----+


In [6]:
df.groupBy("gender").max("marks").show()
df.groupBy("gender").min("marks").show()

+------+----------+
|gender|max(marks)|
+------+----------+
|Female|        99|
|  Male|        99|
+------+----------+

+------+----------+
|gender|min(marks)|
+------+----------+
|Female|        20|
|  Male|        20|
+------+----------+


In [7]:
df.groupBy("age").avg("marks").show()

+---+------------------+
|age|        avg(marks)|
+---+------------------+
| 28|60.487854251012145|
| 29|59.715415019762844|
+---+------------------+


In [9]:
df.groupBy("gender").mean("marks").show()

+------+------------------+
|gender|        avg(marks)|
+------+------------------+
|Female|59.153692614770456|
|  Male| 61.04408817635271|
+------+------------------+


### 83. Spark DF (Group By - Multiple Columns and Aggregations)
For group by different columns, you need only add more columns to groupBy, 
in case of needing more aggregation functions, you need import. 
```python 
from pyspark.sql.functions import sum, avg, max, min, count, mean
```
.alias("...")

In [1]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/StudentData.csv')




In [2]:
df.groupBy(df.gender, df.age).count().show()

+------+---+-----+
|gender|age|count|
+------+---+-----+
|  Male| 28|  238|
|  Male| 29|  261|
|Female| 28|  256|
|Female| 29|  245|
+------+---+-----+


In [3]:
from pyspark.sql.functions import sum, avg, max, min, count, mean




In [4]:
df.groupBy("course").count().show()

+------+-----+
|course|count|
+------+-----+
|   MVC|  157|
|   DSA|  176|
|    DB|  157|
|    PF|  166|
| Cloud|  192|
|   OOP|  152|
+------+-----+


In [5]:
df.groupBy("course").agg(count("*")).show()
df.groupBy("course", "gender").agg(count("*").alias("test1"), sum("marks").alias("sumMarks"), min("age").alias("minAge"), max("marks").alias("maxMarks"), avg("marks").alias("avgMarks")).orderBy("course").show()

+------+--------+
|course|count(1)|
+------+--------+
|   MVC|     157|
|   DSA|     176|
|    DB|     157|
|    PF|     166|
| Cloud|     192|
|   OOP|     152|
+------+--------+

+------+------+-----+--------+------+--------+------------------+
|course|gender|test1|sumMarks|minAge|maxMarks|          avgMarks|
+------+------+-----+--------+------+--------+------------------+
| Cloud|  Male|   86|    5127|    28|      97|59.616279069767444|
| Cloud|Female|  106|    6316|    28|      99| 59.58490566037736|
|    DB|  Male|   82|    5073|    28|      98| 61.86585365853659|
|    DB|Female|   75|    4197|    28|      96|             55.96|
|   DSA|  Male|   78|    4826|    28|      99| 61.87179487179487|
|   DSA|Female|   98|    6124|    28|      99| 62.48979591836735|
|   MVC|Female|   71|    4344|    28|      99|61.183098591549296|
|   MVC|  Male|   86|    5241|    28|      99| 60.94186046511628|
|   OOP|  Male|   70|    4234|    28|      99| 60.48571428571429|
|   OOP|Female|   82|    46

### 86. Quiz (Group By)
- Display the total numbers of students enrolled in each course 
- Display the male and female students enrolled in each course
- Display the total marks achieved by each gender in each course 
- Display the minimum, maximum and average marks achieved in each course by each age group

In [1]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/StudentData.csv')




In [4]:
df.show(3)

+---+------+----------------+------+-----+-----+--------------------+
|age|gender|            name|course| roll|marks|               email|
+---+------+----------------+------+-----+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB| 2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud|12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF|21267|   45|Jeannetta Golden_...|
+---+------+----------------+------+-----+-----+--------------------+
only showing top 3 rows


In [8]:
df.groupBy(df.course).count().show()

+------+-----+
|course|count|
+------+-----+
|   MVC|  157|
|   DSA|  176|
|    DB|  157|
|    PF|  166|
| Cloud|  192|
|   OOP|  152|
+------+-----+


In [6]:
df.groupBy(df.gender).count().show()

+------+-----+
|gender|count|
+------+-----+
|Female|  501|
|  Male|  499|
+------+-----+


In [10]:
df.groupBy(df.gender, df.course).sum("marks").show()

+------+------+----------+
|gender|course|sum(marks)|
+------+------+----------+
|  Male|    DB|      5073|
|  Male|   DSA|      4826|
|  Male|   MVC|      5241|
|Female| Cloud|      6316|
|Female|   OOP|      4682|
|  Male| Cloud|      5127|
|  Male|   OOP|      4234|
|Female|   MVC|      4344|
|  Male|    PF|      5960|
|Female|    DB|      4197|
|Female|    PF|      3973|
|Female|   DSA|      6124|
+------+------+----------+


In [13]:
#Display the minimum, maximum and average marks achieved in each course by each age group
from pyspark.sql.functions import min, max, avg
df.groupBy(df.age).agg( min("marks").alias(", max("marks"), avg("marks")).show()

+---+----------+----------+------------------+
|age|min(marks)|max(marks)|        avg(marks)|
+---+----------+----------+------------------+
| 28|        20|        99|60.487854251012145|
| 29|        20|        99|59.715415019762844|
+---+----------+----------+------------------+


In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
###
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pySparkDF").getOrCreate()
##
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.32 and you have 0.30 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::533588983801:role/FULL_GLUE
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 8ed6ba1f-ca87-4747-b28e-a7503d063174
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog true
Waiting for session 8ed6ba1f-ca87-4747-b28e-a7503d063174 to get into ready s

In [5]:
df = spark.read.options(inferSchema = True).csv('s3://xxxxxxxx/Harry Potter_ The Complete Coll - J.K. Rowling.txt')




In [4]:
df.show(20)

+--------------------+
|            CONTENTS|
+--------------------+
|Harry Potter and ...|
|Harry Potter and ...|
|Harry Potter and ...|
|Harry Potter and ...|
|Harry Potter and ...|
|Harry Potter and ...|
|Harry Potter and ...|
|         FOR JESSICA|
|            FOR ANNE|
|          AND FOR DI|
|            CONTENTS|
|                 ONE|
|   The Boy Who Lived|
|                 TWO|
| The Vanishing Glass|
|               THREE|
|The Letters from ...|
|                FOUR|
|The Keeper of the...|
|                FIVE|
+--------------------+
only showing top 20 rows


In [10]:
from pyspark.sql.functions import split, explode
df2 = df.withColumn('_c0',explode(split('_c0',' ')))




In [13]:
df2.show(10)

+----------+
|       _c0|
+----------+
|  CONTENTS|
|     Harry|
|    Potter|
|       and|
|       the|
|Sorcerer’s|
|     Stone|
|     Harry|
|    Potter|
|       and|
+----------+
only showing top 10 rows


### 90. Spark UDF
- it's similar than df.map

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/OfficeData.csv')




In [2]:
df.show(3)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+
only showing top 3 rows


In [5]:
def getTotalSalary(salary, bonus):
    return salary + bonus

totalSalaryUDF = udf(lambda x, y:  getTotalSalary(x,y), IntegerType())


df.withColumn("total_salary",totalSalaryUDF(df.salary, df.bonus)).show(3)
df.withColumn("total_salary",totalSalaryUDF(df.salary, df.bonus)).show(3)

+-------------+----------+-----+------+---+-----+------------+
|employee_name|department|state|salary|age|bonus|total_salary|
+-------------+----------+-----+------+---+-----+------------+
|        James|     Sales|   NY| 90000| 34|10000|      100000|
|      Michael|     Sales|   NY| 86000| 56|20000|      106000|
|       Robert|     Sales|   CA| 81000| 30|23000|      104000|
+-------------+----------+-----+------+---+-----+------------+
only showing top 3 rows


### 91. Quiz (UDFs)
- Create a new column increment and provide the increment to employees on the following criteria
    - If the employe is in NY state, his increment would be 10% of salary plus 5% of bonus
    - If the employe is in CA state, his increment would be 12% of salary plus 3% of bonus

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, IntegerType
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/OfficeData.csv')
df.show(3)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+
only showing top 3 rows


In [16]:
def increment(state, salary, bonus):
    if state == 'NY':
        return salary*1.1+bonus*1.05
    elif state == 'CA':
        return salary*1.12+bonus*1.03
    else: 
        return salary
    
incrementUDF = udf(lambda x,y,z: increment(x,y,z), FloatType())
df.withColumn("newAndTotalSalary", incrementUDF(df.state, df.salary, df.bonus)).show()


+-------------+----------+-----+------+---+-----+-----------------+
|employee_name|department|state|salary|age|bonus|newAndTotalSalary|
+-------------+----------+-----+------+---+-----+-----------------+
|        James|     Sales|   NY| 90000| 34|10000|         109500.0|
|      Michael|     Sales|   NY| 86000| 56|20000|         115600.0|
|       Robert|     Sales|   CA| 81000| 30|23000|         114410.0|
|        Maria|   Finance|   CA| 90000| 24|23000|         124490.0|
|        Raman|   Finance|   CA| 99000| 40|24000|         135600.0|
|        Scott|   Finance|   NY| 83000| 36|19000|         111250.0|
|          Jen|   Finance|   NY| 79000| 53|15000|         102650.0|
|         Jeff| Marketing|   CA| 80000| 25|18000|         108140.0|
|        Kumar| Marketing|   NY| 91000| 50|21000|         122150.0|
+-------------+----------+-----+------+---+-----+-----------------+


### 93. Solution (Cache and Presist)
- Used to save the data in memory temporaly to optimize the workflow

### 94. Spark DF (DF to RDD)
- Used to save the data in memory temporaly to optimize the workflow

### 95. Spark DF (Spark SQL)
- Used to simple query

In [12]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/OfficeData.csv')
df.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)


In [3]:
df.createOrReplaceTempView("Office")




In [10]:
spark.sql("select department from Office where age > 30").show()

AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Insufficient Lake Formation permission(s) on global_temp (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: 8e5f84d3-ef6e-4a70-be3e-40b735d6b1c5; Proxy: null));'


In [13]:
df.select("department").filter(df.age > 30).show()

+----------+
|department|
+----------+
|     Sales|
|     Sales|
|   Finance|
|   Finance|
|   Finance|
| Marketing|
+----------+


In [None]:
spark.sql("select department , avg(age) from Office group by department").show()

### 96. Spark DF (Write DF)
#### Modes:
- overwrite
- append
- ignore
- error

In [1]:
df.write.options(header = True).csv('s3://xxxxxxxx/output')

Exception encountered while retrieving session: An error occurred (ExpiredTokenException) when calling the GetSession operation: The security token included in the request is expired 
Traceback (most recent call last):
  File "/home/jupyter-user/.local/lib/python3.7/site-packages/aws_glue_interactive_sessions_kernel/glue_pyspark/GlueKernel.py", line 688, in get_current_session
    current_session = self.glue_client.get_session(Id=self.get_session_id())["Session"]
  File "/home/jupyter-user/.local/lib/python3.7/site-packages/botocore/client.py", line 415, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/jupyter-user/.local/lib/python3.7/site-packages/botocore/client.py", line 745, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredTokenException) when calling the GetSession operation: The security token included in the request is expired
Failed to retrieve session status 
Ex

In [None]:
df.write.mode("overwrite").options(header = True).csv('s3://xxxxxxxx/output')

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.32 and you have 0.30 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::533588983801:role/FULL_GLUE
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: c9b931ab-0a2d-4424-9f1f-08bbf7348ca5
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog true
Waiting for session c9b931ab-0a2d-4424-9f1f-08bbf7348ca5 to get into ready s

### 97. Proyect Overview
- Print the total number of employees in the company
- Print the total number of departments in the company
- Print the department names of the company
- Print the total number of employees in each department
- Print the total number of employees in each state
- Print the total number of employees in each state in each department
- Print the minimum and maximum salaries in each department and sort salaries in ascending order
- Print the names of employees working in NY state under Finance department whose bonuses are greater than the average bonuses of employees in NY state
- Raise the salaries $500 of all employees whose age is greater than 45
- Create DF of all those employees whose age is greater than 45 and save them in a file

In [None]:
df = spark.read.options(header = True, inferSchema = True).csv('s3://xxxxxxxx/OfficeDataProject.csv')




In [4]:
df.show(3)

+-----------+-----------------+----------+-----+------+---+-----+
|employee_id|    employee_name|department|state|salary|age|bonus|
+-----------+-----------------+----------+-----+------+---+-----+
|       1000|        Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|  Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
+-----------+-----------------+----------+-----+------+---+-----+
only showing top 3 rows


In [10]:
df.select(df.department).distinct().count()

6


In [1]:
df.select(df.department).distinct().count()
df.select(df.department).distinct().show()

NameError: name 'df' is not defined


In [10]:
from pyspark.sql.functions import sum, avg, max, min, count, mean
df.groupBy("department").agg(count("employee_name").alias("n")).show()

+----------+---+
|department|  n|
+----------+---+
| Marketing|170|
|        HR|171|
|  Accounts|162|
|     Sales|169|
|   Finance|162|
|Purchasing|166|
+----------+---+


In [16]:
df.groupBy("state").agg(count("employee_name").alias("n")).show()

+-----+---+
|state|  n|
+-----+---+
|   LA|205|
|   NY|173|
|   WA|208|
|   AK|209|
|   CA|205|
+-----+---+


In [17]:
df.groupBy("state","department").agg(count("employee_name").alias("n")).show()

+-----+----------+---+
|state|department|  n|
+-----+----------+---+
|   LA|  Accounts| 29|
|   NY|        HR| 30|
|   AK|  Accounts| 37|
|   CA|     Sales| 42|
|   CA|  Accounts| 35|
|   CA| Marketing| 33|
|   CA|Purchasing| 32|
|   WA|        HR| 47|
|   WA|     Sales| 27|
|   WA| Marketing| 39|
|   LA|        HR| 41|
|   AK|Purchasing| 30|
|   LA|     Sales| 35|
|   WA|  Accounts| 27|
|   NY|  Accounts| 34|
|   NY|     Sales| 27|
|   NY|Purchasing| 21|
|   WA|Purchasing| 38|
|   LA| Marketing| 26|
|   NY| Marketing| 30|
+-----+----------+---+
only showing top 20 rows


In [14]:
df.groupBy("department").agg(max("salary").alias("max"), min("salary").alias("min")).orderBy(col("max").asc(), col("min").asc()).show() # Falta order ascending

+----------+----+----+
|department| max| min|
+----------+----+----+
|  Accounts|9890|1007|
|   Finance|9899|1006|
| Marketing|9974|1031|
|        HR|9982|1013|
|     Sales|9982|1103|
|Purchasing|9985|1105|
+----------+----+----+


- Print the names of employees working in NY state under Finance department whose bonuses are greater than the average bonuses of employees in NY state
- Raise the salaries $500 of all employees whose age is greater than 45
- Create DF of all those employees whose age is greater than 45 and save them in a file

In [31]:
df.createOrReplaceTempView("Office")
spark.sql("select employee_name from Office where state ='NY' and department = 'Finance' and bonus >(select avg(bonus) from Office where state ='NY')").show()

AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Insufficient Lake Formation permission(s) on global_temp (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: 234722c0-7573-4856-979e-05c3fe8ad007; Proxy: null));'


In [None]:
df.show(2)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.34 and you have 0.30 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::533588983801:role/FULL_GLUE
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: f97f9206-4286-4814-b3fb-0436a5d02c26
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog true
Waiting for session f97f9206-4286-4814-b3fb-0436a5d02c26 to get into ready s

In [None]:
from pyspark.sql.functions import col, avg
averag = df.filter(df.state == "NY").agg(avg(col("bonus"))).collect()
av = averag[0][0]
df.select("employee_name","department","state","bonus").filter((col("state") == "NY") &  (col("bonus") > av)).show()

+--------------------+----------+-----+-----+
|       employee_name|department|state|bonus|
+--------------------+----------+-----+-----+
|       Marvis Cobian|Purchasing|   NY| 1765|
|          Trena Benz|  Accounts|   NY| 1624|
|       Vivan Sifford|   Finance|   NY| 1261|
|        Jaclyn Layla|        HR|   NY| 1289|
|         Escoto Kohn|Purchasing|   NY| 1533|
|   Kaczorowski Dynes|Purchasing|   NY| 1802|
|    Lisabeth Gallman| Marketing|   NY| 1623|
|       Suzanne Gilma|Purchasing|   NY| 1840|
|          Luisa Jere|     Sales|   NY| 1866|
|      Herder Gallman|   Finance|   NY| 1402|
|        Vivan Locust|        HR|   NY| 1313|
|          Nena Rocha|   Finance|   NY| 1647|
|       Leif Lemaster|   Finance|   NY| 1782|
|         Imai Locust|     Sales|   NY| 1809|
|Ellingsworth Meli...|   Finance|   NY| 1358|
|     Toussaint Tyree|        HR|   NY| 1573|
|     Antonio Juliana|     Sales|   NY| 1338|
|        Escoto Gilma|   Finance|   NY| 1285|
|        Soules Mckey|        HR| 

- Raise the salaries $500 of all employees whose age is greater than 45

In [15]:
df.show(3)

+-----------+-----------------+----------+-----+------+---+-----+
|employee_id|    employee_name|department|state|salary|age|bonus|
+-----------+-----------------+----------+-----+------+---+-----+
|       1000|        Nitz Leif| Marketing|   CA|  6131| 26|  543|
|       1001|  Melissia Dedman|   Finance|   AK|  4027| 43| 1290|
|       1002|Rudolph Barringer|        HR|   LA|  3122| 43| 1445|
+-----------+-----------------+----------+-----+------+---+-----+
only showing top 3 rows


- Raise the salaries $500 of all employees whose age is greater than 45 (UDF)
- Raise the salaries $500 of all employees whose age is greater than 45 (WithColumn)