# Basic Operations in jupyter notebook

#### Francisco Florido Valero 25740380-E

In this notebook we are going to perform some basic operations using pyspark

In [1]:
from pyspark.sql import SparkSession, functions

In [2]:
spark_session = SparkSession \
        .builder \
        .master("local[4]") \
        .getOrCreate()

In [3]:
logger = spark_session._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

Load the data into a dataframe

In [4]:
data_frame = spark_session \
        .read \
        .options(header='true', inferschema='true') \
        .option("delimiter", ",") \
        .csv("D:/Documentos/Master/temariomaster/p9.Spark/masterBDpy/data/simple.csv") \
        .persist()

Print schema and dataset to see what kind of data we have

In [5]:
data_frame.printSchema()
data_frame.show()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Weight: double (nullable = true)
 |-- HasACar: boolean (nullable = true)
 |-- BirthDate: string (nullable = true)

+------+---+------+-------+----------+
|  Name|Age|Weight|HasACar| BirthDate|
+------+---+------+-------+----------+
|  Luis| 23|  84.5|   true|2019-02-28|
|  Lola| 42|  70.2|  false|2000-10-01|
|  Paco| 66|  90.1|  false|1905-12-03|
|Manolo| 68|  75.3|   true|2000-01-04|
+------+---+------+-------+----------+



In [6]:
print("data types: " + str(data_frame.dtypes))

data types: [('Name', 'string'), ('Age', 'int'), ('Weight', 'double'), ('HasACar', 'boolean'), ('BirthDate', 'string')]


Describe the dataframe

In [7]:
data_frame \
        .describe() \
        .show()

+-------+----+------------------+-----------------+----------+
|summary|Name|               Age|           Weight| BirthDate|
+-------+----+------------------+-----------------+----------+
|  count|   4|                 4|                4|         4|
|   mean|null|             49.75|80.02499999999999|      null|
| stddev|null|21.391197566600457|8.951489633947338|      null|
|    min|Lola|                23|             70.2|1905-12-03|
|    max|Paco|                68|             90.1|2019-02-28|
+-------+----+------------------+-----------------+----------+



Explain the dataframe

In [8]:
data_frame \
        .explain()

== Physical Plan ==
InMemoryTableScan [Name#16, Age#17, Weight#18, HasACar#19, BirthDate#20]
   +- InMemoryRelation [Name#16, Age#17, Weight#18, HasACar#19, BirthDate#20], StorageLevel(disk, memory, 1 replicas)
         +- FileScan csv [Name#16,Age#17,Weight#18,HasACar#19,BirthDate#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/D:/Documentos/Master/temariomaster/p9.Spark/masterBDpy/data/simple.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Age:int,Weight:double,HasACar:boolean,BirthDate:string>




Select names column in the dataframe

In [9]:
data_frame.select("Name").show()

+------+
|  Name|
+------+
|  Luis|
|  Lola|
|  Paco|
|Manolo|
+------+



Select columns name and age, but adding 2 to age

In [10]:
data_frame.select("Name", data_frame["Age"] + 2) \
        .show()

+------+---------+
|  Name|(Age + 2)|
+------+---------+
|  Luis|       25|
|  Lola|       44|
|  Paco|       68|
|Manolo|       70|
+------+---------+



Select the rows having a name length > 4

In [11]:
data_frame.select(functions.length(data_frame["Name"]) > 4).show()

+------------------+
|(length(Name) > 4)|
+------------------+
|             false|
|             false|
|             false|
|              true|
+------------------+



Select names staring with L

In [12]:
data_frame.select("name", data_frame["name"].startswith("L")) \
        .show()

+------+-------------------+
|  name|startswith(name, L)|
+------+-------------------+
|  Luis|               true|
|  Lola|               true|
|  Paco|              false|
|Manolo|              false|
+------+-------------------+



Add a new column "Senior" containing true if the person age is > 45

In [13]:
data_frame.withColumn("Senior", data_frame["Age"] > 45) \
        .show()

+------+---+------+-------+----------+------+
|  Name|Age|Weight|HasACar| BirthDate|Senior|
+------+---+------+-------+----------+------+
|  Luis| 23|  84.5|   true|2019-02-28| false|
|  Lola| 42|  70.2|  false|2000-10-01| false|
|  Paco| 66|  90.1|  false|1905-12-03|  true|
|Manolo| 68|  75.3|   true|2000-01-04|  true|
+------+---+------+-------+----------+------+



Rename column HasACar as Owner

In [14]:
data_frame.withColumnRenamed("HasACar", "Owner") \
        .show()

+------+---+------+-----+----------+
|  Name|Age|Weight|Owner| BirthDate|
+------+---+------+-----+----------+
|  Luis| 23|  84.5| true|2019-02-28|
|  Lola| 42|  70.2|false|2000-10-01|
|  Paco| 66|  90.1|false|1905-12-03|
|Manolo| 68|  75.3| true|2000-01-04|
+------+---+------+-----+----------+



Remove column DateBirth

In [15]:
data_frame.drop("BirthDate") \
        .show()

+------+---+------+-------+
|  Name|Age|Weight|HasACar|
+------+---+------+-------+
|  Luis| 23|  84.5|   true|
|  Lola| 42|  70.2|  false|
|  Paco| 66|  90.1|  false|
|Manolo| 68|  75.3|   true|
+------+---+------+-------+



Sort by age method 1

In [16]:
data_frame.sort(data_frame.Age.desc()).show()

+------+---+------+-------+----------+
|  Name|Age|Weight|HasACar| BirthDate|
+------+---+------+-------+----------+
|Manolo| 68|  75.3|   true|2000-01-04|
|  Paco| 66|  90.1|  false|1905-12-03|
|  Lola| 42|  70.2|  false|2000-10-01|
|  Luis| 23|  84.5|   true|2019-02-28|
+------+---+------+-------+----------+



Sort by age method 2

In [17]:
data_frame.sort("Age", ascending=False).show()

+------+---+------+-------+----------+
|  Name|Age|Weight|HasACar| BirthDate|
+------+---+------+-------+----------+
|Manolo| 68|  75.3|   true|2000-01-04|
|  Paco| 66|  90.1|  false|1905-12-03|
|  Lola| 42|  70.2|  false|2000-10-01|
|  Luis| 23|  84.5|   true|2019-02-28|
+------+---+------+-------+----------+



Sort by age and if the age descendent and if it coincides by wieght ascending 

In [18]:
data_frame.orderBy(["Age", "Weight"], ascending=[0, 1]).show()

+------+---+------+-------+----------+
|  Name|Age|Weight|HasACar| BirthDate|
+------+---+------+-------+----------+
|Manolo| 68|  75.3|   true|2000-01-04|
|  Paco| 66|  90.1|  false|1905-12-03|
|  Lola| 42|  70.2|  false|2000-10-01|
|  Luis| 23|  84.5|   true|2019-02-28|
+------+---+------+-------+----------+



Get a RDD

In [19]:
rdd_from_dataframe = data_frame \
        .rdd \
        .persist()

In [20]:
    for i in rdd_from_dataframe.collect():
        print(i)

Row(Name='Luis', Age=23, Weight=84.5, HasACar=True, BirthDate='2019-02-28')
Row(Name='Lola', Age=42, Weight=70.2, HasACar=False, BirthDate='2000-10-01')
Row(Name='Paco', Age=66, Weight=90.1, HasACar=False, BirthDate='1905-12-03')
Row(Name='Manolo', Age=68, Weight=75.3, HasACar=True, BirthDate='2000-01-04')


Sum all the weights in RDD

In [21]:
sum_of_weights = rdd_from_dataframe \
        .map(lambda row: row[2]) \
        .reduce(lambda x, y: x + y)  # sum()
print("Sum of weights (RDDs): " + str(sum_of_weights))

Sum of weights (RDDs): 320.09999999999997


Sum all the weights in dataframe

In [22]:
weights = data_frame \
        .select("Weight") \
        .groupBy() \
        .sum() \
        .collect()

In [23]:
print(weights)
print("Sum of weights (dataframe): " + str(weights[0][0]))

[Row(sum(Weight)=320.09999999999997)]
Sum of weights (dataframe): 320.09999999999997


In [28]:
data_frame.select(functions.sum(data_frame["Weight"])).show()
data_frame.agg({"Weight": "sum"}).show()

+------------------+
|       sum(Weight)|
+------------------+
|320.09999999999997|
+------------------+

+------------------+
|       sum(Weight)|
+------------------+
|320.09999999999997|
+------------------+



Get the mean age (RDD)

In [24]:
total_age = rdd_from_dataframe \
        .map(lambda row: row[1]) \
        .reduce(lambda x, y: x + y)

In [25]:
mean_age = total_age / rdd_from_dataframe.count()

In [26]:
print("Mean age (RDDs): " + str(mean_age))

Mean age (RDDs): 49.75


Get the mean weight (dataframe)

In [27]:
data_frame.select(functions.avg(data_frame["Weight"])) \
        .withColumnRenamed("avg(Weight)", "Average") \
        .show()

+-----------------+
|          Average|
+-----------------+
|80.02499999999999|
+-----------------+



In [28]:
data_frame.agg({"Weight": "avg"}).show()

+-----------------+
|      avg(Weight)|
+-----------------+
|80.02499999999999|
+-----------------+



Write to a json file

In [None]:
data_frame\
        .write\
        .save("output.json", format="json")

Write to a CSV file

In [None]:
data_frame\
        .write\
        .format("csv")\
        .save("output.csv")