<a href="https://colab.research.google.com/github/cantaruttim/Learning_PySpark/blob/main/DataFrame_with_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[link text](https://)

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=9499e58912fbef07a33d63a7869313e525c18e221748d6be75e584b9b781b411
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


## DataFrame

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

In [3]:
# Firstly, you can create a PySpark DataFrame from a list of rows

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [4]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [8]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [5]:
# Create a PySpark DataFrame with an explicit schema.

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp') # an explicit schema
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [9]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [6]:
# Create a PySpark DataFrame from a pandas DataFrame

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df) # converte um pd.DataFrame em um Pyspark dataframe.DataFrame
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [7]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [10]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [11]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True) # muda a forma que a estrutura é apresentada
### se assemelha muito com os DataFrames do pandas
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [14]:
df.show(3, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
-RECORD 2------------------
 a   | 3                   
 b   | 4.0                 
 c   | string3             
 d   | 2000-03-01          
 e   | 2000-01-03 12:00:00 



In [19]:
## Show the summary of data
df.select("a", "b", "c", "e").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



In [20]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [21]:
# In order to avoid throwing an out-of-memory exception, use DataFrame.take() or DataFrame.tail().

df.take(2)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0))]

In [22]:
df.tail(1)

[Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [23]:
## Podemos converter um Pyspark dataframe para um Pandas dataframe

df.toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


## Selecting and Accessing Data

In [24]:
df.e

Column<'e'>

In [25]:
df.select("c").show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [26]:
## adicionando uma nova coluna
from pyspark.sql.functions import upper

df.withColumn('Upper_test', upper(df.c)).show()

+---+---+-------+----------+-------------------+----------+
|  a|  b|      c|         d|                  e|Upper_test|
+---+---+-------+----------+-------------------+----------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|   STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|   STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|   STRING3|
+---+---+-------+----------+-------------------+----------+



In [27]:
# To select a subset of rows, use DataFrame.filter().

df.filter(df.a == 2).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+



In [31]:
## Podemos selecionar duas linhas ou mais
df.filter( (df.a == 1) | (df.a == 2)).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+



## Applying a Function

In [34]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.b)).show()

+------------------+
|pandas_plus_one(b)|
+------------------+
|                 3|
|                 4|
|                 5|
+------------------+



## Grouping data

In [35]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [36]:
# Grouping and then applying the avg() function to the resulting groups.

df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [37]:
## podemos salvar o arquivo

df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
+-----+------+---+---+



## Trabalhando com SQL

In [38]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



## Pandas API on Spark

In [39]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession



### Object Creation


In [40]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])
s

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [41]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [42]:
dates = pd.date_range('20130101', periods=6)
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

In [43]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf

Unnamed: 0,A,B,C,D
2013-01-01,0.896047,-0.418702,-0.340148,0.215922
2013-01-02,0.835436,-1.575054,0.554834,0.735782
2013-01-03,0.650008,-2.460789,-1.487523,-2.404403
2013-01-04,1.260803,0.775275,0.092577,-0.244574
2013-01-05,1.219943,1.239785,-0.779335,-0.271625
2013-01-06,-0.347684,0.2903,-0.874761,0.77273


In [44]:
# Now, this pandas DataFrame can be converted to a pandas-on-Spark DataFrame

psdf = ps.from_pandas(pdf)
type(psdf)

pyspark.pandas.frame.DataFrame

In [45]:
psdf

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D
2013-01-01,0.896047,-0.418702,-0.340148,0.215922
2013-01-02,0.835436,-1.575054,0.554834,0.735782
2013-01-03,0.650008,-2.460789,-1.487523,-2.404403
2013-01-04,1.260803,0.775275,0.092577,-0.244574
2013-01-05,1.219943,1.239785,-0.779335,-0.271625
2013-01-06,-0.347684,0.2903,-0.874761,0.77273


In [46]:
# Also, it is possible to create a pandas-on-Spark DataFrame from Spark DataFrame easily.

spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(pdf)
sdf.show()

+------------------+-------------------+-------------------+-------------------+
|                 A|                  B|                  C|                  D|
+------------------+-------------------+-------------------+-------------------+
|0.8960472472464849|-0.4187019782585993|-0.3401481210901341| 0.2159220388638453|
|0.8354356062656205|-1.5750544930871082| 0.5548342950681278| 0.7357818737554269|
|0.6500083141508058|-2.4607886451156293|-1.4875231761592347| -2.404403306287404|
|1.2608027232549146| 0.7752747528788674|0.09257738242808676|-0.2445744882688423|
|1.2199432474588159| 1.2397853671392858|-0.7793351934570647|  -0.27162450319595|
|-0.347683985835739| 0.2903000242039291|-0.8747606503827858| 0.7727301119847808|
+------------------+-------------------+-------------------+-------------------+



In [47]:
# Creating pandas-on-Spark DataFrame from Spark DataFrame

psdf = sdf.pandas_api()
psdf

Unnamed: 0,A,B,C,D
0,0.896047,-0.418702,-0.340148,0.215922
1,0.835436,-1.575054,0.554834,0.735782
2,0.650008,-2.460789,-1.487523,-2.404403
3,1.260803,0.775275,0.092577,-0.244574
4,1.219943,1.239785,-0.779335,-0.271625
5,-0.347684,0.2903,-0.874761,0.77273


In [48]:
# Podemos verificar o tipo do dados

psdf.dtypes

A    float64
B    float64
C    float64
D    float64
dtype: object

In [49]:
## verificando o index

psdf.index

Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')

In [50]:
## verificando as colunas

psdf.columns

Index(['A', 'B', 'C', 'D'], dtype='object')

In [51]:
## podemos transformar em um array

psdf_array = psdf.to_numpy()
psdf_array



array([[ 0.89604725, -0.41870198, -0.34014812,  0.21592204],
       [ 0.83543561, -1.57505449,  0.5548343 ,  0.73578187],
       [ 0.65000831, -2.46078865, -1.48752318, -2.40440331],
       [ 1.26080272,  0.77527475,  0.09257738, -0.24457449],
       [ 1.21994325,  1.23978537, -0.77933519, -0.2716245 ],
       [-0.34768399,  0.29030002, -0.87476065,  0.77273011]])

In [52]:
type(psdf_array)

numpy.ndarray

In [53]:
# Showing a quick statistic summary of your data

psdf.describe()

Unnamed: 0,A,B,C,D
count,6.0,6.0,6.0,6.0
mean,0.752426,-0.358197,-0.472393,-0.199361
std,0.587413,1.425773,0.731912,1.171443
min,-0.347684,-2.460789,-1.487523,-2.404403
25%,0.650008,-1.575054,-0.874761,-0.271625
50%,0.835436,-0.418702,-0.779335,-0.244574
75%,1.219943,0.775275,0.092577,0.735782
max,1.260803,1.239785,0.554834,0.77273


In [54]:
psdf.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
A,6.0,0.752426,0.587413,-0.347684,0.650008,0.835436,1.219943,1.260803
B,6.0,-0.358197,1.425773,-2.460789,-1.575054,-0.418702,0.775275,1.239785
C,6.0,-0.472393,0.731912,-1.487523,-0.874761,-0.779335,0.092577,0.554834
D,6.0,-0.199361,1.171443,-2.404403,-0.271625,-0.244574,0.735782,0.77273


In [55]:
# Sorting by its index
psdf.sort_index(ascending=False)

Unnamed: 0,A,B,C,D
5,-0.347684,0.2903,-0.874761,0.77273
4,1.219943,1.239785,-0.779335,-0.271625
3,1.260803,0.775275,0.092577,-0.244574
2,0.650008,-2.460789,-1.487523,-2.404403
1,0.835436,-1.575054,0.554834,0.735782
0,0.896047,-0.418702,-0.340148,0.215922


In [56]:
# Sorting by value
psdf.sort_values(by='C')

Unnamed: 0,A,B,C,D
2,0.650008,-2.460789,-1.487523,-2.404403
5,-0.347684,0.2903,-0.874761,0.77273
4,1.219943,1.239785,-0.779335,-0.271625
0,0.896047,-0.418702,-0.340148,0.215922
3,1.260803,0.775275,0.092577,-0.244574
1,0.835436,-1.575054,0.554834,0.735782


In [57]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])

In [58]:
pdf1

Unnamed: 0,A,B,C,D,E
2013-01-01,0.896047,-0.418702,-0.340148,0.215922,
2013-01-02,0.835436,-1.575054,0.554834,0.735782,
2013-01-03,0.650008,-2.460789,-1.487523,-2.404403,
2013-01-04,1.260803,0.775275,0.092577,-0.244574,


In [59]:
## add o valor de 1 aos indices de valor 0 e de valor 1 da coluna E
### o método loc seleciona as linhas do dataframe

pdf1.loc[dates[0]:dates[1], 'E'] = 1

In [60]:
pdf1

Unnamed: 0,A,B,C,D,E
2013-01-01,0.896047,-0.418702,-0.340148,0.215922,1.0
2013-01-02,0.835436,-1.575054,0.554834,0.735782,1.0
2013-01-03,0.650008,-2.460789,-1.487523,-2.404403,
2013-01-04,1.260803,0.775275,0.092577,-0.244574,


In [61]:
psdf1 = ps.from_pandas(pdf1)
type(psdf1)

pyspark.pandas.frame.DataFrame

In [62]:
psdf1

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,0.896047,-0.418702,-0.340148,0.215922,1.0
2013-01-02,0.835436,-1.575054,0.554834,0.735782,1.0
2013-01-03,0.650008,-2.460789,-1.487523,-2.404403,
2013-01-04,1.260803,0.775275,0.092577,-0.244574,


In [63]:
# To drop any rows that have missing data.

psdf1.dropna(how='any')

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,0.896047,-0.418702,-0.340148,0.215922,1.0
2013-01-02,0.835436,-1.575054,0.554834,0.735782,1.0


In [66]:
# Filling missing data.

psdf1.fillna(value=3.5)

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,0.896047,-0.418702,-0.340148,0.215922,1.0
2013-01-02,0.835436,-1.575054,0.554834,0.735782,1.0
2013-01-03,0.650008,-2.460789,-1.487523,-2.404403,3.5
2013-01-04,1.260803,0.775275,0.092577,-0.244574,3.5


## Stats¶

In [67]:
psdf.mean()

A    0.752426
B   -0.358197
C   -0.472393
D   -0.199361
dtype: float64