DATAFRAMES IN PYTHON

Create PySpark DataFrame From an Existing RDD

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession


In [0]:
sc = SparkContext.getOrCreate()

In [0]:
spark = SparkSession.builder.appName('PySpark DataFrame From RDD').getOrCreate()


In [0]:
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)


In [0]:
print(type(rdd))


<class 'pyspark.rdd.RDD'>


Converting the RDD into PySpark DataFrame

In [0]:
sub = ['Division','English','Mathematics','Physics','Chemistry']
marks_df = spark.createDataFrame(rdd, schema=sub)


In [0]:
print(type(marks_df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
marks_df.printSchema()

root
 |-- Division: string (nullable = true)
 |-- English: long (nullable = true)
 |-- Mathematics: long (nullable = true)
 |-- Physics: long (nullable = true)
 |-- Chemistry: long (nullable = true)



In [0]:
marks_df.show()

+--------+-------+-----------+-------+---------+
|Division|English|Mathematics|Physics|Chemistry|
+--------+-------+-----------+-------+---------+
|       C|     85|         76|     87|       91|
|       B|     85|         76|     87|       91|
|       A|     85|         78|     96|       92|
|       A|     92|         76|     89|       96|
+--------+-------+-----------+-------+---------+



Create PySpark DataFrame From an External File


In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('PySpark DataFrame From External Files').getOrCreate()

1. Reading a CSV File

In [0]:
csv_file = spark.read.csv('/FileStore/tables/SalesRecords-4.csv', sep = ',', inferSchema = True, header = True)


In [0]:
display(csv_file)

Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
Australia and Oceania,Tuvalu,Baby Food,Offline,H,2010-05-28,669165933,2010-06-27,9925,255.28,159.42,2533654.0,1582243.5,951410.5
Central America and the Caribbean,Grenada,Cereal,Online,C,2012-08-22,963881480,2012-09-15,2804,205.7,117.11,576782.8,328376.44,248406.36
Europe,Russia,Office Supplies,Offline,L,2014-05-02,341417157,2014-05-08,1779,651.21,524.96,1158502.59,933903.84,224598.75
Sub-Saharan Africa,Sao Tome and Principe,Fruits,Online,C,2014-06-20,514321792,2014-07-05,8102,9.33,6.92,75591.66,56065.84,19525.82
Sub-Saharan Africa,Rwanda,Office Supplies,Offline,L,2013-02-01,115456712,2013-02-06,5062,651.21,524.96,3296425.02,2657347.52,639077.5
Australia and Oceania,Solomon Islands,Baby Food,Online,C,2015-02-04,547995746,2015-02-21,2974,255.28,159.42,759202.72,474115.08,285087.64
Sub-Saharan Africa,Angola,Household,Offline,M,2011-04-23,135425221,2011-04-27,4187,668.27,502.54,2798046.49,2104134.98,693911.51
Sub-Saharan Africa,Burkina Faso,Vegetables,Online,H,2012-07-17,871543967,2012-07-27,8082,154.06,90.93,1245112.92,734896.26,510216.66
Sub-Saharan Africa,Republic of the Congo,Personal Care,Offline,M,2015-07-14,770463311,2015-08-25,6070,81.73,56.67,496101.1,343986.9,152114.2
Sub-Saharan Africa,Senegal,Cereal,Online,H,2014-04-18,616607081,2014-05-30,6593,205.7,117.11,1356180.1,772106.23,584073.87


In [0]:
file_location = "/FileStore/tables/SalesRecords-4.csv"
file_type = "csv"

infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)


_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13
Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
Australia and Oceania,Tuvalu,Baby Food,Offline,H,5/28/2010,669165933,6/27/2010,9925,255.28,159.42,2533654.00,1582243.50,951410.50
Central America and the Caribbean,Grenada,Cereal,Online,C,8/22/2012,963881480,9/15/2012,2804,205.70,117.11,576782.80,328376.44,248406.36
Europe,Russia,Office Supplies,Offline,L,5/2/2014,341417157,5/8/2014,1779,651.21,524.96,1158502.59,933903.84,224598.75
Sub-Saharan Africa,Sao Tome and Principe,Fruits,Online,C,6/20/2014,514321792,7/5/2014,8102,9.33,6.92,75591.66,56065.84,19525.82
Sub-Saharan Africa,Rwanda,Office Supplies,Offline,L,2/1/2013,115456712,2/6/2013,5062,651.21,524.96,3296425.02,2657347.52,639077.50
Australia and Oceania,Solomon Islands,Baby Food,Online,C,2/4/2015,547995746,2/21/2015,2974,255.28,159.42,759202.72,474115.08,285087.64
Sub-Saharan Africa,Angola,Household,Offline,M,4/23/2011,135425221,4/27/2011,4187,668.27,502.54,2798046.49,2104134.98,693911.51
Sub-Saharan Africa,Burkina Faso,Vegetables,Online,H,7/17/2012,871543967,7/27/2012,8082,154.06,90.93,1245112.92,734896.26,510216.66
Sub-Saharan Africa,Republic of the Congo,Personal Care,Offline,M,7/14/2015,770463311,8/25/2015,6070,81.73,56.67,496101.10,343986.90,152114.20


In [0]:
txt_file = spark.read.text("/FileStore/tables/sample3.txt")
display(txt_file)

value
Quod equidem non reprehendo;
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quibus natura iure responderit non esse verum aliunde finem beate vivendi, a se principia rei gerendae peti; Quae enim adhuc protulisti, popularia sunt, ego autem a te elegantiora desidero. Duo Reges: constructio interrete. Tum Lucius: Mihi vero ista valde probata sunt, quod item fratri puto. Bestiarum vero nullum iudicium puto. Nihil enim iam habes, quod ad corpus referas; Deinde prima illa, quae in congressu solemus: Quid tu, inquit, huc? Et homini, qui ceteris animantibus plurimum praestat, praecipue a natura nihil datum esse dicemus?"
"Iam id ipsum absurdum, maximum malum neglegi. Quod ea non occurrentia fingunt, vincunt Aristonem; Atqui perspicuum est hominem e corpore animoque constare, cum primae sint animi partes, secundae corporis. Fieri, inquam, Triari, nullo pacto potest, ut non dicas, quid non probes eius, a quo dissentias. Equidem e Cn. An dubium est, quin virtus ita maximam partem optineat in rebus humanis, ut reliquas obruat?"
Quis istum dolorem timet?
"Summus dolor plures dies manere non potest? Dicet pro me ipsa virtus nec dubitabit isti vestro beato M. Tubulum fuisse, qua illum, cuius is condemnatus est rogatione, P. Quod si ita sit, cur opera philosophiae sit danda nescio."
"Ex eorum enim scriptis et institutis cum omnis doctrina liberalis, omnis historia."
"Quod si ita est, sequitur id ipsum, quod te velle video, omnes semper beatos esse sapientes. Cum enim fertur quasi torrens oratio, quamvis multa cuiusque modi rapiat, nihil tamen teneas, nihil apprehendas, nusquam orationem rapidam coerceas. Ita redarguitur ipse a sese, convincunturque scripta eius probitate ipsius ac moribus. At quanta conantur! Mundum hunc omnem oppidum esse nostrum! Incendi igitur eos, qui audiunt, vides. Vide, ne magis, inquam, tuum fuerit, cum re idem tibi, quod mihi, videretur, non nova te rebus nomina inponere. Qui-vere falsone, quaerere mittimus-dicitur oculis se privasse; Si ista mala sunt, in quae potest incidere sapiens, sapientem esse non esse ad beate vivendum satis. At vero si ad vitem sensus accesserit, ut appetitum quendam habeat et per se ipsa moveatur, quid facturam putas?"
"Quem si tenueris, non modo meum Ciceronem, sed etiam me ipsum abducas licebit."
"Stulti autem malorum memoria torquentur, sapientes bona praeterita grata recordatione renovata delectant."
Esse enim quam vellet iniquus iustus poterat inpune.


Reading a JSON File

In [0]:
json_file = spark.read.json("/FileStore/tables/example_1.json", multiLine=True)
display(json_file)

color,fruit,size
Red,Apple,Large


In [0]:
print(type(csv_file))

print(type(txt_file))

print(type(json_file))


<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
csv_file.printSchema()
txt_file.printSchema()
json_file.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)

root
 |-- value: string (nullable = true)

root
 |-- color: string (nullable = true)
 |-- fruit: string (nullable = true)
 |-- size: string (nullable = true)



 PySpark DataFrame to Pandas DataFrame

In [0]:
df = csv_file.toPandas()

In [0]:
type(df)

pandas.core.frame.DataFrame

Reading Multiple Files as Once

In [0]:
files = ['/FileStore/tables/SalesRecords.csv', '/FileStore/tables/Sample100.csv']
df = spark.read.csv(files, sep = ',' ,  inferSchema=True, header=True)
df.display()

Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
Australia and Oceania,Tuvalu,Baby Food,Offline,H,2010-05-28,669165933.0,2010-06-27,9925.0,255.28,159.42,2533654.0,1582243.5,951410.5
Central America and the Caribbean,Grenada,Cereal,Online,C,2012-08-22,963881480.0,2012-09-15,2804.0,205.7,117.11,576782.8,328376.44,248406.36
Europe,Russia,Office Supplies,Offline,L,2014-05-02,341417157.0,2014-05-08,1779.0,651.21,524.96,1158502.59,933903.84,224598.75
Sub-Saharan Africa,Sao Tome and Principe,Fruits,Online,C,2014-06-20,514321792.0,2014-07-05,8102.0,9.33,6.92,75591.66,56065.84,19525.82
Sub-Saharan Africa,Rwanda,Office Supplies,Offline,L,2013-02-01,115456712.0,2013-02-06,5062.0,651.21,524.96,3296425.02,2657347.52,639077.5
Australia and Oceania,Solomon Islands,Baby Food,Online,C,2015-02-04,547995746.0,2015-02-21,2974.0,255.28,159.42,759202.72,474115.08,285087.64
Sub-Saharan Africa,Angola,Household,Offline,M,2011-04-23,135425221.0,2011-04-27,4187.0,668.27,502.54,2798046.49,2104134.98,693911.51
Sub-Saharan Africa,Burkina Faso,Vegetables,Online,H,2012-07-17,871543967.0,2012-07-27,8082.0,154.06,90.93,1245112.92,734896.26,510216.66
Sub-Saharan Africa,Republic of the Congo,Personal Care,Offline,M,2015-07-14,770463311.0,2015-08-25,6070.0,81.73,56.67,496101.1,343986.9,152114.2
Sub-Saharan Africa,Senegal,Cereal,Online,H,2014-04-18,616607081.0,2014-05-30,6593.0,205.7,117.11,1356180.1,772106.23,584073.87


Selecting, Renaming, Filtering Data in a Pandas DataFrame

In [0]:
data = [(('Ram'), '1991-04-01', 'M', 3000),
        (('Mike'), '2000-05-19', 'M', 4000),
        (('Rohini'), '1978-09-05', 'M', 4000),
        (('Maria'), '1967-12-01', 'F', 4000),
        (('Jenis'), '1980-02-17', 'F', 1200)]
 
columns = ["Name", "DOB", "Gender", "salary"]
 
df = spark.createDataFrame(data=data,schema=columns)

df.show()


+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+



RENAMING  Using withColumnRenamed()


In [0]:
df.withColumnRenamed("DOB","DateOfBirth").show()


+------+-----------+------+------+
|  Name|DateOfBirth|Gender|salary|
+------+-----------+------+------+
|   Ram| 1991-04-01|     M|  3000|
|  Mike| 2000-05-19|     M|  4000|
|Rohini| 1978-09-05|     M|  4000|
| Maria| 1967-12-01|     F|  4000|
| Jenis| 1980-02-17|     F|  1200|
+------+-----------+------+------+



Renaming multiple column names


In [0]:
df.withColumnRenamed("Gender","Sex")\
    .withColumnRenamed("salary","Amount").show()


+------+----------+---+------+
|  Name|       DOB|Sex|Amount|
+------+----------+---+------+
|   Ram|1991-04-01|  M|  3000|
|  Mike|2000-05-19|  M|  4000|
|Rohini|1978-09-05|  M|  4000|
| Maria|1967-12-01|  F|  4000|
| Jenis|1980-02-17|  F|  1200|
+------+----------+---+------+



 RENAMING Using selectExpr()

In [0]:
data = df.selectExpr("Name as name","DOB","Gender","salary")

data.show()


+------+----------+------+------+
|  name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+



 Using select() method

In [0]:
from pyspark.sql.functions import col
data = df.select(col("Name"),col("DOB"),
                 col("Gender"),
                 col("salary").alias('Amount'))
data.show()

+------+----------+------+------+
|  Name|       DOB|Gender|Amount|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+



Using toDF()


In [0]:
Data_list = ["Emp Name","Date of Birth",
             " Gender-m/f","Paid salary"]
 
new_df = df.toDF(*Data_list)
new_df.show()


+--------+-------------+-----------+-----------+
|Emp Name|Date of Birth| Gender-m/f|Paid salary|
+--------+-------------+-----------+-----------+
|     Ram|   1991-04-01|          M|       3000|
|    Mike|   2000-05-19|          M|       4000|
|  Rohini|   1978-09-05|          M|       4000|
|   Maria|   1967-12-01|          F|       4000|
|   Jenis|   1980-02-17|          F|       1200|
+--------+-------------+-----------+-----------+



FILTERING DATA

In [0]:
filtered_df = df.filter(df['Salary'] < 3500)

filtered_df.show()

+-----+----------+------+------+
| Name|       DOB|Gender|salary|
+-----+----------+------+------+
|  Ram|1991-04-01|     M|  3000|
|Jenis|1980-02-17|     F|  1200|
+-----+----------+------+------+



In [0]:
df.filter(df['Gender'] == 'M').show()

+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
+------+----------+------+------+



In [0]:
df.filter(df['DOB'] > '1990-01-01').show()

+----+----------+------+------+
|Name|       DOB|Gender|salary|
+----+----------+------+------+
| Ram|1991-04-01|     M|  3000|
|Mike|2000-05-19|     M|  4000|
+----+----------+------+------+



In [0]:
df.filter(df['DOB'] > '1990-01-01').show()

+----+----------+------+------+
|Name|       DOB|Gender|salary|
+----+----------+------+------+
| Ram|1991-04-01|     M|  3000|
|Mike|2000-05-19|     M|  4000|
+----+----------+------+------+



In [0]:
df.filter(df['name'].startswith('R')).show()

+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|Rohini|1978-09-05|     M|  4000|
+------+----------+------+------+



In [0]:
df.filter(df['name'].contains('a')).show()

+-----+----------+------+------+
| Name|       DOB|Gender|salary|
+-----+----------+------+------+
|  Ram|1991-04-01|     M|  3000|
|Maria|1967-12-01|     F|  4000|
+-----+----------+------+------+



In [0]:
df.filter((df['Gender'] == 'M') & (df['salary'] > 3500)).show()

+------+----------+------+------+
|  Name|       DOB|Gender|salary|
+------+----------+------+------+
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
+------+----------+------+------+

