# PYSPARK - RDD

In [None]:
# pip install arquivo.whl
# pip install C:\Pasta\arquivo.whl

# Se tiver dependências, você pode indicar uma pasta onde procurar as dependências:
# pip install --no-index --find-links c:\Pasta programa

## Importing Libraries

In [None]:
from pyspark import SparkContext                                    # Importing SparkContext
from pyspark.sql import SparkSession, Window, Row                   # Importing importing methods for creating a cluster
from pyspark.sql import functions as F                              # Importing SQL Functions
from pyspark.sql.functions import col, isnan, when, count           # Importing relevant dataframe functions
from pyspark.sql.functions import *                                 # Importing inbuilt SQL Functions
from pyspark.sql.types import *                                     # Importing SQL types

## Initializing Spark

### SparkContext 

In [None]:
from pyspark import SparkContext
sc = SparkContext(master = 'local[2]')

### Inspect SparkContext 

In [None]:
sc.version #Retrieve SparkContext version
sc.pythonVer #Retrieve Python version
sc.master #Master URL to connect to
str(sc.sparkHome) #Path where Spark is installed on worker nodes
str(sc.sparkUser()) #Retrieve name of the Spark User running SparkContext
sc.appName #Return application name
sc.applicationld #Retrieve application ID
sc.defaultParallelism #Return default level of parallelism
sc.defaultMinPartitions #Default minimum number of partitions for RDDs

### Configuration 

In [None]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
     .setMaster("local")
     .setAppName("My app")
     . set   ("spark. executor.memory",   "lg"))
sc = SparkContext(conf = conf)


### Using the Shell 

In the PySpark shell, a special interpreter-aware SparkContext is already created in the variable called sc.
Set which master the context connects to with the --master argument, and add Python .zip..egg or.py files to the runtime path by passing a comma-separated list to  --py-files.

In [None]:
$ ./bin/spark-shell --master local[2]
$ ./bin/pyspark --master local[s] --py-files code.py

## Loading Data 


##### Parallelized Collections 

In [None]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd = sc.parallelize([("a",["x","y","z"]),
               ("b" ["p","r,"])])

##### External Data
Read either one text file from HDFS, a local file system or any Hadoop-supported file system URI with textFile(), or read in a directory of text files with wholeTextFiles(). 


In [None]:
textFile = sc.textFile("/my/directory/•.txt")
textFile2 = sc.wholeTextFiles("/my/directory/")

### Inferring the Schema Using Reflection

In [None]:
from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin

### Programmatically Specifying the Schema

In [None]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

## Retrieving RDD Information 


#### Basic Information 

In [None]:
>>> rdd.getNumPartitions() #List the number of partitions
>>> rdd.count() #Count RDD instances 3
>>> rdd.countByKey() #Count RDD instances by key
defaultdict(<type 'int'>,{'a':2,'b':1})
>>> rdd.countByValue() #Count RDD instances by value
defaultdict(<type 'int'>,{('b',2):1,('a',2):1,('a',7):1})
>>> rdd.collectAsMap() #Return (key,value) pairs as a dictionary
   {'a': 2, 'b': 2}
>>> rdd3.sum() #Sum of RDD elements 4950
>>> sc.parallelize([]).isEmpty() #Check whether RDD is empty
True


#### Summary 


In [None]:
>>> rdd3.max() #Maximum value of RDD elements 
99
>>> rdd3.min() #Minimum value of RDD elements
0
>>> rdd3.mean() #Mean value of RDD elements 
49.5
>>> rdd3.stdev() #Standard deviation of RDD elements 
28.866070047722118
>>> rdd3.variance() #Compute variance of RDD elements 
833.25
>>> rdd3.histogram(3) #Compute histogram by bins
([0,33,66,99],[33,33,34])
>>> rdd3.stats() #Summary statistics (count, mean, stdev, max & min)

df.summary().show() #like Pandas Describe

## Applying Functions 


In [None]:
#Apply a function to each RFD element
>>> rdd.map(lambda x: x+(x[1],x[0])).collect()
[('a' ,7,7, 'a'),('a' ,2,2, 'a'), ('b' ,2,2, 'b')]
#Apply a function to each RDD element and flatten the result
>>> rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0]))
>>> rdd5.collect()
['a',7 , 7 ,  'a' , 'a' , 2,  2,  'a', 'b', 2 , 2, 'b']
#Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys
>>> rdds.flatMapValues(lambda x: x).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'),('b', 'p'),('b', 'r')]


## Selecting Data


#### Getting

In [None]:
>>> rdd.collect() #Return a list with all RDD elements 
[('a', 7), ('a', 2), ('b', 2)]
>>> rdd.take(2) #Take first 2 RDD elements 
[('a', 7),  ('a', 2)]
>>> rdd.first() #Take first RDD element
('a', 7)
>>> rdd.top(2) #Take top 2 RDD elements 
[('b', 2), ('a', 7)]

#### Sampling

In [None]:
>>> rdd3.sample(False, 0.15, 81).collect() #Return sampled subset of rdd3
     [3,4,27,31,40,41,42,43,60,76,79,80,86,97]

#### Filtering

In [None]:
>>> rdd.filter(lambda x: "a" in x).collect() #Filter the RDD
[('a',7),('a',2)]
>>> rdd5.distinct().collect() #Return distinct RDD values
['a' ,2, 'b',7]
>>> rdd.keys().collect() #Return (key,value) RDD's keys
['a',  'a',  'b']

## Iterating 

In [None]:
>>> def g (x): print(x)
>>> rdd.foreach(g) #Apply a function to all RDD elements
('a', 7)
('b', 2)
('a', 2)


## Reshaping Data 


#### Reducing

In [None]:
>>> rdd.reduceByKey(lambda x,y : x+y).collect() #Merge the rdd values for each key
[('a',9),('b',2)]
>>> rdd.reduce(lambda a, b: a+ b) #Merge the rdd values
('a', 7, 'a' , 2 , 'b' , 2)

#### Grouping by

In [None]:
>>> rdd3.groupBy(lambda x: x % 2) #Return RDD of grouped values
          .mapValues(list)
          .collect()
>>> rdd.groupByKey() #Group rdd by key
          .mapValues(list)
          .collect() 
[('a',[7,2]),('b',[2])]

#### Aggregating

In [None]:
>> seqOp = (lambda x,y: (x[0]+y,x[1]+1))
>>> combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
#Aggregate RDD elements of each partition and then the results
>>> rdd3.aggregate((0,0),seqOp,combOp) 
(4950,100)
#Aggregate values of each RDD key
>>> rdd.aggregateByKey((0,0),seqop,combop).collect() 
     [('a',(9,2)), ('b',(2,1))]
#Aggregate the elements of each partition, and then the results
>>> rdd3.fold(0,add)
     4950
#Merge the values for each key
>>> rdd.foldByKey(0, add).collect()
[('a' ,9), ('b' ,2)]
#Create tuples of RDD elements by applying a function
>>> rdd3.keyBy(lambda x: x+x).collect()

## Mathematical Operations 


In [None]:
>>>> rdd.subtract(rdd2).collect() #Return each rdd value not contained in rdd2
[('b' ,2), ('a' ,7)]
#Return each (key,value) pair of rdd2 with no matching key in rdd
>>> rdd2.subtractByKey(rdd).collect()
[('d', 1)1
>>>rdd.cartesian(rdd2).collect() #Return the Cartesian product of rdd and rdd2


## Sort 

In [None]:
>>> rdd2.sortBy(lambda x: x[1]).collect() #Sort RDD by given function
[('d',1),('b',1),('a',2)]
>>> rdd2.sortByKey().collect() #Sort (key, value) ROD by key
[('a' ,2), ('b' ,1), ('d' ,1)]


## Union

In [None]:
rdd = sc.union([employees, employees2])
rdd.collect.foreach(println)

## Subtract

In [None]:
#Subtract returns an RDD that contains only the elements that are in the first RDD.
data = Array((103,"Mark Choi","Torrance","CA"), (104,"Janet Reyes","Rolling Hills","CA"),(105,"Lester Cruz","Van Nuys","CA"))
rdd = sc.parallelize(data)
data2 = Array((103,"Mark Choi","Torrance","CA"))
rdd2 = sc.parallelize(data2)
employees = rdd.subtract(rdd2)
employees.collect.foreach(println)
#(105,LesterCruz,Van Nuys,CA)
#(104,JanetReyes,Rolling Hills,CA)

## PARTITION

In [None]:
meuDataset.getNumPartitions() # resposta >>> 10

meuDataset2 = meuDataset.repartition(20) # aumentando para 20 partições
meuDataset3 = meuDataset.coalesce(2) # reduzindo para 2 partições

#Se você já estiver trabalhando com um dataframe, você pode usar o recurso de 
#converte-lo primeiro em rdd e depois usar esses comandos supracitados, 
#conforme mostrado abaixo:
meuDataset.rdd.getNumPartitions() // resposta: 10

## Repartitioning 


In [None]:
>>> rdd.repartition(4) #New RDD with 4 partitions
>>> rdd.coalesce(1) #Decrease the number of partitions in the RDD to 1

## Coalesce

In [None]:
#Coalesce reduces the number of partitions in an RDD. 
#You might want to use coalesce after performing a filter on a large RDD. 
#While filtering reduces the amount of data consumed by the new RDD, 
#it inherits the number of partitions of the original RDD. 
df.coalesce(1).write.mode("append").parquet("/user/hive/warehouse/Mytable")

#Example 8-11. Coalescing a large RDD in the PySpark shell
# Wildcard input that may match thousands of files
>>> input = sc.textFile("s3n://log-files/2014/*.log")
>>> input.getNumPartitions()
35154
# A filter that excludes almost all data
>>> lines = input.filter(lambda line: line.startswith("2014-10-17"))
>>> lines.getNumPartitions()
35154
# We coalesce the lines RDD before caching
>>> lines = lines.coalesce(5).cache()
>>> lines.getNumPartitions()
4
# Subsequent analysis can operate on the coalesced RDD...
>>> lines.count()
Serialization Format
#When Spark is transferring data over the network or spilling

## Repartition

In [None]:
#Repartition can both decrease and increase the number of partitions in an RDD. You would 
#generally use coalesce when reducing partitions since it’s more efficient than repartition. 
#Increasing the number of partitions is useful for increasing the degree of parallelism when 
#writing to HDFS. In the following example, we’re writing six Parquet files to HDFS.
df.repartition(6).write.mode("append").parquet("/user/hive/warehouse/Mytable")

#Note Coalesce is generally faster than repartition. Repartition will perform a full 
#shuffle, creating new partitions and equally distributing data across worker nodes. 
#Coalesce minimizes data movement and avoids a full shuffle by using existing partitions

## Collect

In [None]:
#Collect returns the entire dataset as an array to the driver program.
myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.collect

## Foreach

In [None]:
#Foreach executes a function on each element of the dataset.
myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.collect.foreach(println)
#tokyo
#newyork
#paris
#sanFrancisco

# Saving 

In [None]:
>>> rdd.saveAsTextFile("rdd.txt")
>>> rdd.saveAsHadoopFile("hdfs:// namenodehost/parent/child",
               'org.apache.hadoop.mapred.TextOutputFormat')


## Stopping SparkContext 

In [None]:
sc.stop()

## Execution 

In [None]:
$ ./bin/spark-submit examples/src/main/python/pi.py

# PYSPARK - SQL

## Initializing SparkSession

In [None]:
from pyspark.sql import SparkSession
spark a SparkSession \
     .builder\
     .appName("Python Spark SQL basic example") \
     .config("spark.some.config.option", "some-value") \
     .getOrCreate()

## Creating DataFrames

### Fromm RDDs

In [None]:
from pyspark.sql.types import*

##### Infer Schema

In [None]:
sc = spark.sparkContext
lines = sc.textFile(''people.txt'')
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(nameap[0],ageaint(p[l])))
peopledf = spark.createDataFrame(people)

##### Specify Schema


In [None]:
people = parts.map(lambda p: Row(name=p[0],
               age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people, schema).show()

### From Spark Data Sources

##### JSON

In [None]:
#EX 1
df = spark.read.json("customer.json")
df.show()


In [None]:
# EX 2
df2 = spark.read.load("people.json", format="json")

##### Parquet files

In [None]:
df3 = spark.read.load("users.parquet")

##### TXT files

In [None]:
df4 = spark.read.text("people.txt")

##### CSV

In [None]:
# Reading the data
df = spark.read.csv('/content/car data.csv', header=True, inferSchema="true")

# Shape of the dataset
print('Shape of the dataset: ', (df.count(), len(df.columns)))

# Displaying top n=10 rows
df.show(n=10)

Shape of the dataset:  (301, 9)
+-------------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|     Car_Name|Year|Selling_Price|Present_Price|Kms_Driven|Fuel_Type|Seller_Type|Transmission|Owner|
+-------------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|         ritz|2014|         3.35|         5.59|     27000|   Petrol|     Dealer|      Manual|    0|
|          sx4|2013|         4.75|         9.54|     43000|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2017|         7.25|         9.85|      6900|   Petrol|     Dealer|      Manual|    0|
|      wagon r|2011|         2.85|         4.15|      5200|   Petrol|     Dealer|      Manual|    0|
|        swift|2014|          4.6|         6.87|     42450|   Diesel|     Dealer|      Manual|    0|
|vitara brezza|2018|         9.25|         9.83|      2071|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2015|         6.75|         8.12|     18796|   Petrol|     Dealer|      Manual|    0|
|      s cross|2015|          6.5|         8.61|     33429|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2016|         8.75|         8.89|     20273|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2015|         7.45|         8.92|     42367|   Diesel|     Dealer|      Manual|    0|
+-------------+----+-------------+-------------+----------+---------+-----------+------------+-----+
only showing top 10 rows

##### SQL

In [None]:
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

## Data Description

In [None]:
# Alternatively, displaying the head of the data
df.head(n=5)

# Getting a description of the dataset, similar to pandas df.describe()
df.describe().show()

# Checking for dataframe schema, similar to pandas df.info()
df.printSchema()   

# Checking type of dataframe
print(type(df))

# Checking type of column data
print(type(df.select('Selling_Price')))

# Checking the datatypes of columns:
# Displaying dtypes of columns
df.dtypes

In [None]:
# Filtering out the top 5 rows and displaying them:
df1 = df.limit(5)
df1.show()

## Filter

In [None]:
#Filter entries of age, only keep those records of which the values are >24
df.filter(df["age"]>24).show()
df.filter(df["age"]>24).filter(df['Avg_Salary'] > 500000).show() #more than one filter

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

In [None]:
# Filtering out the top 5 rows and displaying them, as well as displaying the rest of the dataframe.
# getting the list of Row objects
row_list = df.collect()
  
# Slicing the Python List
part1 = row_list[:5]
part2 = row_list[5:]
  
# Converting the slices to PySpark DataFrames
slice1 = spark.createDataFrame(part1)
slice2 = spark.createDataFrame(part2)
  
# Printing the first slice
print('First DataFrame')
slice1.show()
  
# Printing the second slice
print('Second DataFrame')
slice2.show()

## Where

In [None]:
df.where((df['Avg_Salary'] > 500000) & (df['Number_of_houses'] > 2)).show()

## Duplicate Values 

In [None]:
df = df.dropDuplicates()

## Queries

In [None]:
from pyspark.sql import functions as F

##### Select

In [None]:
df.select("firstName").show() #Show all entries in firstName column
df.select("firstName","lastName") \
      .show()
df.select("firstName", #Show all entries in firstName, age and type
              "age",
              explode("phoneNumber") \
              .alias("contactInfo")) \
      .select("contactInfo.type",
              "firstName",
              "age") \
      .show()
df.select(df["firstName"],df["age"]+ 1) #Show all entries in firstName and age, .show() add 1 to the entries of age
df.select(df['age'] > 24).show() #Show all entries where age >24

# Displaying the values of a particular column
df.select('Selling_Price').show()

# Displaying multiple columns of the dataset:
df.select(['Selling_Price','Present_Price']).show()

##### When

In [None]:
df.select("firstName", #Show firstName and 0 or 1 depending on age >30
               F.when(df.age > 30, 1) \
              .otherwise(0)) \
      .show()
df[df.firstName.isin("Jane","Boris")] #Show firstName if in the given options
.collect()

##### ISIN - IS IN

In [None]:
# isin passando um argumento
df.filter(df.Marca.isin('Chevrolet')).display()

# isin passando uma lista
lista = ['Chevrolet', 'Bmw', 'Toyota']
df.filter(df.Marca.isin(lista)).display()

# isin usando uma coluna de um sdf ... neste caso é um semileft join
df = base.join(df_marcas, ['Marca'], 'leftsemi')

# IS NOT IN
df.filter(~df.Marca.isin(lista)).display()



##### Like

In [None]:
df.select("firstName", #Show firstName, and lastName is TRUE if lastName is like Smith
              df.lastName.like("Smith")) \
     .show()

##### Startswith - Endswith 

In [None]:
df.select("firstName", #Show firstName, and TRUE if lastName starts with Sm
              df.lastName \
                .startswith("Sm")) \
      .show()
df.select(df.lastName.endswith("th"))\ #Show last names ending in th
      .show()

##### Substring 

In [None]:
df.select(df.firstName.substr(1, 3) \ #Return substrings of firstName
                          .alias("name")) \
        .collect()

# Fatiar strings - retorna os 7 caraceteres a partir da posição 0
df = df.withColumn('NOME', F.substring(F.col('NOME'), 0, 7)) 

##### Between 

In [None]:
df.select(df.age.between(22, 24)) \ #Show age: values are TRUE if between 22 and 24
          .show()

##### Lpad

In [None]:
# adiciona "CPF" a esquerda nas colunas que possuem valores. 
#Como CPF possue por padrão 11 digitos vamos adicionar mais 4 (15)
df = df.withColumn('CPF', F.lpad(F.col('CPF'), 15, "CPF ")) 

## Sample

In [None]:
# no replace
sampleWithoutKeyConsideration = noDuplicateDf1.sample(withReplacement=False, fraction=0.5, seed=200)

# with replace
sampleWithoutKeyConsideration1 = noDuplicateDf1.sample(withReplacement=True, fraction=0.5, seed=200)

## Find Frequent Items

In [None]:
duplicateDataDf.freqItems(cols=['iv1']).show()
duplicateDataDf.freqItems(cols=['iv1','iv2']).show()

## Add, Update & Remove Columns 

##### Adding Columns

In [None]:
df = df.withColumn('city',df.address.city) \
            .withColumn('postalCode',df.address.postalCode) \
            .withColumn('state',df.address.state) \
            .withColumn('streetAddress',df.address.streetAddress) \
            .withColumn('telePhoneNumber', explode(df.phoneNumber.number)) \
            .withColumn('telePhoneType', explode(df.phoneNumber.type)) 

In [None]:
# Adding a new column to the dataset
df1 = df.withColumn("Car New", df['Present_Price']*2)

##### Updating Columns

In [None]:
df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')

##### Removing Columns

In [None]:
# EX 1
df = df.drop("address", "phoneNumber")

# EX 2
df = df.drop(df.address).drop(df.phoneNumber)

##### Concat

In [None]:
# EX 1
df.select(concat(df.firstname,df.middlename,df.lastname).alias("FullName"))
#SILVIOSANTOSSILVA

# EX 2
df.select(concat_ws('_',df.firstname,df.middlename,df.lastname).alias("FullName")
#SILVIO_SANTOS_SILVA

# Ex 3 - Unir as trasncr
newDataframe = base.groupBy('filename').agg(F.concat_ws('. ', F.collect_list(base.dialog)).alias('dialog'))

#Customer
#dtCustomer = dataframe[dataframe['speaker']=='Customer'].groupby([coluna_grouped], as_index=False)[coluna_dialog].agg({coluna_dialog: '. '.join})
dtCustomer = base.filter(base['speaker']=='Customer').groupBy('filename').agg(F.concat_ws('. ', F.collect_list(base.dialog)).alias('dialog_Customer'))
    
#Vendor
#dtVendor = dataframe[dataframe['speaker']=='Vendor'].groupby([coluna_grouped], as_index=False)[coluna_dialog].agg({coluna_dialog: '. '.join})
dtVendor = base.filter(base['speaker']=='Vendor').groupBy('filename').agg(F.concat_ws('. ', F.collect_list(base.dialog)).alias('dialog_Vendor'))

In [None]:
dtcustomer =  dtcustomer.withColumnRenamed('filename', 'filename_c')
dtvendor = dtvendor.withColumnRenamed('filename', 'filename_a')

ls_customer = newDataframe.join(dtcustomer, newDataframe['filename']==dtcustomer['filename_c'], 'inner')
ls_vendor = newDataframe.join(dtvendor, newDataframe['filename']==dtvendor['filename_a'], 'inner')
ls_vendor =  ls_vendor.withColumnRenamed('filename', 'filename_ls_a').withColumnRenamed('dialog', 'dialog_ls_a')
newDataframe2 = ls_customer.join(ls_vendor, ls_customer['filename']==ls_vendor['filename_ls_a'], 'inner').select('filename', 'dialog', 'dialog_customer', 'dialog_vendor')

## Missing & Replacing Values

In [None]:
df.na.fill(50).show() #Replace null values
df.na.drop().show() #Return new df omitting rows with null values
df.na.drop(subset='country').show()
df.fillna('0').show() #Replace with 0
df.fillna( { 'country':'USA', 'browser':'Safari' } ).show() #replace null in with something in that column
df.na \ #Return new df replacing one value with another
       .replace(10, 20) \
       .show()

In [None]:
# If the thresh value is set to 2, any row containing less than two non-null values will be dropped.
missingDf.dropna(how ='all',thresh=2).show()


In [None]:
# PySpark doesn't have the sophisticated function like Pandas to check for null values.
# But we have used a custom code to check for null values in a dataframe.
# Creating a dataframe to check null value counts
null_df = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '') | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns])

# Displaying the null value counts dataframe
null_df.show()
+--------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|Car_Name|Year|Selling_Price|Present_Price|Kms_Driven|Fuel_Type|Seller_Type|Transmission|Owner|
+--------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|       0|   0|            0|            0|         0|        0|          0|           0|    0|
+--------+----+-------------+-------------+----------+---------+-----------+------------+-----+

## Round

In [None]:
df = df.withColumn('swimmerSpeed',round(df.swimmerSpeed, 3)

### Datetime

In [None]:
# Converting Year to datetime
df1 = df.withColumn("Year", F.to_date(F.col("Year").cast("string"), 'yyyy'))

# Extracting Year
df = df1.withColumn('Year', F.year(F.to_timestamp('Year', 'yyyy')))
df.show()

# Add a column with the current date time
df = df. withColumn('Data Atualizacao', F.date_format(F.current_tmestamp(), 'dd MMM yyyy | HH:mm:ss'))
# Data atualizacao
# 08 Oct 2022 | 07:41:11

## Lit
The need to create a new column with a constant value can be very common.

In [None]:
df=df.withColumn('constant',F.lit('finance'))
df.select('Customer_subtype','constant').show()

[Out]: Customer_subtype, Constant
       Rich, finance
       Billion, finance
       Million, finance

## GroupBy 

In [None]:
df.groupBy("age")\ #Group by age, count the members in the groups
      .count() \
      .show()

In [None]:
df.groupBy('Customer_subtype').count().show()

In [None]:
#we can apply any type of sorting to the final results. 
#Because we have seven columns in the dataframe—all are
#categorical columns except for one (Avg_Salary), we can iterate over each 
#column and apply aggregation as in the following example:
for col in df.columns: 
    if col !='Avg_Salary':
        print(f" Aggregation for {col}")
            df.groupBy(col).count().orderBy('count',ascending=False).show(truncate=False)

In [None]:
df.groupBy('Customer_main_type').agg(F.mean('Avg_Salary')).show() # F.mean
df.groupBy('Customer_main_type').agg(F.max('Avg_Salary')).show() #F.max
df.groupBy('Customer_main_type').agg(F.min('Avg_Salary')).show()
df.groupBy('Customer_main_type').agg(F.sum('Avg_Salary')).show()
df.sort("Avg_Salary", ascending=False).show()

df.groupBy('Customer_subtype').agg(F.avg('Avg_Salary').alias('mean_salary'))\
    .orderBy('mean_salary',ascending=False).show(50,False)

df.groupBy('Customer_subtype').agg(F.max('Avg_Salary')\
    .alias('max_salary')).orderBy('max_salary',ascending=False).show()

## Collect
Collect list provides all the values in the original order of occurrence 
(they can be reversed as well), and collect set provides only the unique values

In [None]:
df.groupby("Customer_subtype").agg(F.collect_list("Number_of_houses")).show()
[Out]: Customer, [2,1,2,2,2,1,2]
df.groupby("Customer_subtype").agg(F.collect_set("Number_of_houses")).show()
[Out]: Customer, [1,2]

## Sort 

In [None]:
peopledf.sort(peopledf.age.desc()).collect()
df.sort("age", ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1]).collect()
#Sorting on Two Columns in Different Order
swimmerDfSorted3 = swimmerDf.orderBy("Occupation","swimTimeInSecond", ascending=[False,True])

#Performing a Partition-Wise Sort (SinglePartition Case)
swimmerDf.rdd.getNumPartitions()
sortedPartitons = swimmerDf.sortWithinPartitions("Occupation","swimTimeInSecond", ascending=[False,True])

swimmerDf1 = swimmerDf.repartition(2)
swimmerDf1.rdd.glom().collect()

## Join

In [None]:
#. PySpark offers a very convenient way to merge and pivot your dataframe values
#The idea is to combine this dataframe with the original dataframe, so as to have these region 
#codes as part of the original dataframe, as a column.
region_data = spark.createDataFrame([('Family with grownups','PN'),
                                     ('Driven Growers','GJ'),
    ('Conservative families','DD'),
    ('Cruising Seniors','DL'),
    ('Average Family ','MN'),
    ('Living well','KA'),
    ('Successful hedonists','JH'),
    ('Retired and Religious','AX'),
    ('Career Loners','HY'),('Farmers','JH')], 
    schema=StructType().add("Customer_main_type","string").add("Region Code","string")
                                    
new_df=df.join(region_data,on='Customer_main_type')
new_df.groupby("Region Code").count().show()
                                    
                                    
# Two DataFrames
innerDf = studentsDf.join(subjectsDf, studentsDf.studentid == subjectsDf.studentid, how= "inner")
leftOuterDf = studentsDf.join(subjectsDf, studentsDf.studentid == subjectsDf.studentid, how= "left")
rightOuterDf = studentsDf.join(subjectsDf, studentsDf.studentid == subjectsDf.studentid, how= "right")
outerDf = studentsDf.join(subjectsDf, studentsDf.studentid == subjectsDf.studentid, how= "outer")
                                    
# FAZENDO O JOIN ENTRE DOIS DATAFRAMES
df_final = df_2.join(df, df_2['cd_customer'] == df['cd_customer'], "inner").filter(F.col('soma_data') >= F.current_date())
                                    
                                    
# isin usando uma coluna de um sdf ... neste caso é um semileft join
df = base.join(df_marcas_premium, ['Marca'], 'leftsemi')                                    


## Pivoting

In [None]:
df.groupBy('Customer_main_type').pivot('Avg_age').sum('Avg_Salary').fillna(0).show()
df.groupBy('Customer_main_type').pivot('label').sum('Avg_Salary').fillna(0).show()

## Window Functions or Windowed Aggregates

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number

win = Window.orderBy(df['Avg_Salary'].desc())
df=df.withColumn('rank', row_number().over(win).alias('rank'))
df.show()

#One common requirement is to find the top-three values from a category.
#In the following example, we define the window and partition by the Customer 
#subtype column. Basically, what it does is sort the Avg Salary for each 
#of the Customer subtype category, so now we can use filter to fetch the 
#top-three salary values for each group
win_1=Window.partitionBy("Customer_subtype").orderBy(df['Avg_Salary'].desc())
df=df.withColumn('rank', row_number().over(win_1).alias('rank'))
df.groupBy('rank').count().orderBy('rank').show()
df.filter(col('rank') < 4).show()

## Crosstab

In [None]:
surveydf = 
+------+---------+
|Gender| Vote|
+------+---------+
| Male| Yes|
| Male| Yes|
| Male| No|
| Male|DoNotKnow|
| Male| Yes|

surveyDf.crosstab("Gender","Vote").show()
+-----------+---------+--+---+
|Gender_Vote|DoNotKnow|No|Yes|
+-----------+---------+--+---+
| Male| 2| 5| 5|
| Female| 1| 1| 6|
+-----------+---------+--+---+

## Repartitioning 

In [None]:
df.repartition(10)\ #df with 10 partitions
      .rdd \
      .getNumPartitions()
df.coalesce(1).rdd.getNumPartitions() #df with 1 partition

## Running Queries Programmatically 

In [None]:
# Create a Temp View from a DataFrame - to start using Spark SQL in memory

#### Registering DataFrames as Views

In [None]:
peopledf.createGlobalTempView("people") # #gera uma view que permacenerá em disco após o fim da sessão spark
df.createTempView("customer") # Gera uma nova view
df.createOrReplaceTempView("customer") #gera e sobrescreve uma view de mesmo nome

sqlDF = spark.sql("SELECT * FROM people")


# EX
parquetFile = spark.read.parquet("people.parquet")
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()

In [None]:
# verify tables
spark.tableNames()

##### Query Views

In [None]:
df5 = spark.sql("SELECT * FROM customer").show()
peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
               .show()

##### Metadata Refreshing


In [None]:
spark.catalog.refreshTable("my_table")

## Inspect Data

In [None]:
df.dtypes #Return df column names and data types
df.show() #Display the content of df
df.head() #Return first n rows
df.first() #Return first row
df.take(2) #Return the first n rows df.schema Return the schema of df
df.describe().show() #Compute summary statistics df.columns Return the columns of df
df.count() #Count the number of rows in df
df.distinct().count() #Count the number of distinct rows in df
df.printSchema() #Print the schema of df
df.explain() #Print the (logical and physical) plans

## Output

##### Data Structures 


In [None]:
rdd1 = df.rdd #Convert df into an RDD
df.toJSON().first() #Convert df into a RDD of string
df.toPandas() #Return the contents of df as Pandas DataFrame

##### Write & Save to Files 

In [None]:
df.select("firstName", "city")\
       .write \
       .save("nameAndCity.parquet")
df.select("firstName", "age") \
       .write \
       .save("namesAndAges.json",format="json")

In [None]:
# to parquet
df_csv = spark.read.csv('singlelargefile.csv')
df_csv.write.parquet('data.parquet')
df_csv.write.parquet(path='parqData')

In [None]:
# to csv
df.write.csv(path='csvFileDir', header=True,sep=',')

In [None]:
# csv até uma tabela no Sand
BASE_VQG_HORA = pd.read_csv('SEU_CSV.csv', sep=';', encoding='UTF-8')
BASE_VQG_HORA = spark.createDataFrame(BASE_VQG_HORA) #converte o df_Pandas para um df_Spark
BASE_VQG_HORA.createOrReplaceTempView('BASE_VQG_HORA')
BASE_VQG_HORA.show()

NOME_DATA_FRAME.select('Coluna1', 'Coluna2').write.saveAsTable('sand_intelig_atendimento.nome_tabela_lake')


In [None]:
#AZURE - Gravar em uma nova Tabela SQL
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)
    
#Acrescentar à Tabela SQL
try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

# Stopping SparkSession 


In [None]:
spark.stop()

# Analysis

In [None]:
# Analysis Calculations (UDF)
def getAvgSale(saleslist):
  totalsales = 0
  count = 0
  for sale in saleslist:
		totalsales += sale[2] + sale[3]
    count += 2
  return totalsales / count

udfGetAvgSale = udf(getAvgSale, DoubleType())
df = df.withColumn('avg_sale', udfGetAvgSale(df.sales_list))

In [None]:
# Analysis Calculations (Inline)
df = df.read.csv('datafile')
df = df.withColumn('avg', (df.total_sales / df.sales_count))
df = df.withColumn('sq_ft', df.width * df.length)
df = df.withColumn('total_avg_size', udfComputeTotal(df.entries) / df.numEntries)

### Spark2Pandas

In [None]:
.toPandas()

### Pandas2Spark

In [None]:
.createDataFrame()

### SQL

In [None]:
.sql()

#For example, a SQL query (using the .sql() method) that references your DataFrame 
#will throw an error. To access the data in this way, you have to save it as a 
#temporary table.
.createTempView() 

.createOrReplaceTempView()

# Usando Expressoes SQL
df = df.withColumn("comprar", F.expr("CASE WHEN Ano < 2020 THEN 'sim' ELSE 'nao' END"))1 

## User-Defined Functions (UDFs)

In [None]:
from pyspark.sql.functions import udf
df.groupby("Avg_age").count().show()

# Adding a Column of Categorical Variables with Labels
def age_category(age):
    if age == "20-30 years":
        return "Young"
    elif age== "30-40 years":
        return "Mid Aged"
    elif ((age== "40-50 years") or (age== "50-60 years")) :
        return "Old"
    else:
        return "Very Old"

age_udf=udf(age_category,StringType())
df=df.withColumn('age_category',age_udf(df['Avg_age']))
df.select('Avg_age','age_category').show()
df.groupby("age_category").count().show()

In [None]:
# udf para formatar as strings
udf_capitalize = F.udf(lambda x: str(x).capitalize(), StringType())
base = base.withColumn("dialog", udf_capitalize("dialog"))

## Pandas UDF

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

min_sal=1361
max_sal=48919896   

def scaled_salary(salary):
    scaled_sal=(salary-min_sal)/(max_sal-min_sal)
    return scaled_sal

scaling_udf = pandas_udf(scaled_salary, DoubleType())
df.withColumn("scaled_salary",scaling_udf(df['Avg_Salary'])).show(10,False)

## Koalas

In [None]:
# import pandas as pd
import databricks.koalas as pd

#a biblioteca koalas contém a maioria das funções pandas, 
#então basta substituir a lib e continuar usando os codigos em pandas

# DIVERSOS

### UNION - Coalesce

In [None]:

Problem:
+----+-----+-----+-----+-----+-----+---+------+------+------+
| ID | in1 | in2 | in3 | in4 | in5 | / | out1 | out2 | out3 |
+----+-----+-----+-----+-----+-----+---+------+------+------+
|  1 |     |     | C   |     |     | / | C    |      |      |
|  2 | A   |     | C   |     | E   | / | A    | C    | E    |
|  3 | A   | B   | C   |     |     | / | A    | B    | C    |
|  4 | A   | B   | C   | D   | E   | / | A    | B    | C    |
|  5 |     |     |     |     |     | / |      |      |      |
|  6 |     | B   |     |     | E   | / | B    | E    |      |
|  7 |     | B   |     | D   | E   | / | B    | D    | E    |
+----+-----+-----+-----+-----+-----+---+------+------+------+

Solution:
    
import pyspark.sql.functions as f

df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")

cols = df.columns
cols.remove('ID')

df2 = df.withColumn('ins', f.array_except(f.array(*cols), f.array(f.lit(None))))

for i in range(0, 3):
    df2 = df2.withColumn('out' + str(i+1), f.col('ins')[i])
    
df2.show(10, False)

+---+----+----+----+----+----+---------------+----+----+----+
|ID |in1 |in2 |in3 |in4 |in5 |ins            |out1|out2|out3|
+---+----+----+----+----+----+---------------+----+----+----+
|1  |null|null|C   |null|null|[C]            |C   |null|null|
|2  |A   |null|C   |null|E   |[A, C, E]      |A   |C   |E   |
|3  |A   |B   |C   |null|null|[A, B, C]      |A   |B   |C   |
|4  |A   |B   |C   |D   |E   |[A, B, C, D, E]|A   |B   |C   |
|5  |null|null|null|null|null|[]             |null|null|null|
|6  |null|B   |null|null|E   |[B, E]         |B   |E   |null|
|7  |null|B   |null|D   |E   |[B, D, E]      |B   |D   |E   |
+---+----+----+----+----+----+---------------+----+----+----+

#### Calcular o Tempo

In [None]:
import requests
import time

def calcular_tempo(funcao):
    def wrapper():
        tempo_inicial = time.time()
        funcao()
        tempo_final = time.time()
        print(f"Dutação foi de {tempo_final-tempo_inicial} segundos")
    return wrapper

@calcular_tempo
def pegar_cotacao_dolar():
    link = f"https://economia.awesomeapi.com.br/last/USD-BRL"
    requisicao = requests.get(link)
    requisicao = requisicao.json()
    print(requisicao['USDBRL']['bid'])

pegar_cotacao_dolar()