# EVOLUTION OF DATA

Hernan Carlos Chavez Paura Garcia<br>
Feb 8th, 2025<br>
Singh, Pramod. Machine Learning with PySpark: With Natural Language Processing and Recommender Systems. Apress, 2018.

## LOAD AND RELOAD DATA

In [1]:
# pip install pyspark

In [2]:
# We start with importing and creating the SparkSession object first in order to use Spark.
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('data_procesing').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = spark.read.csv('MACHINE_LEARNING_WITH_SPARK_MATERIAL/machine-learning-with-pyspark-master/chapter_2_Data_Processing/sample_data.csv', 
                    inferSchema=True, header=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/09 04:51:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df.columns

['ratings', 'age', 'experience', 'family', 'mobile']

In [4]:
len(df.columns)

5

In [5]:
df.count()

33

In [6]:
print((df.count()),(len(df.columns)))

33 5


In [7]:
df.printSchema()

root
 |-- ratings: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: double (nullable = true)
 |-- family: integer (nullable = true)
 |-- mobile: string (nullable = true)



In [8]:
df.show(3)

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
+-------+---+----------+------+-------+
only showing top 3 rows



In [9]:
df.select('age', 'mobile').show(5)

+---+-------+
|age| mobile|
+---+-------+
| 32|   Vivo|
| 27|  Apple|
| 22|Samsung|
| 37|  Apple|
| 27|     MI|
+---+-------+
only showing top 5 rows



In [10]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------+
|summary|           ratings|               age|        experience|            family|mobile|
+-------+------------------+------------------+------------------+------------------+------+
|  count|                33|                33|                33|                33|    33|
|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181|  NULL|
| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|1.8448330794164254|  NULL|
|    min|                 1|                22|               2.5|                 0| Apple|
|    max|                 5|                42|              23.0|                 5|  Vivo|
+-------+------------------+------------------+------------------+------------------+------+



## ADDING A NEW COLUMN

In [11]:
df.withColumn('age_after_10_years',(df['age'] + 10)).show(10,False)

+-------+---+----------+------+-------+------------------+
|ratings|age|experience|family|mobile |age_after_10_years|
+-------+---+----------+------+-------+------------------+
|3      |32 |9.0       |3     |Vivo   |42                |
|3      |27 |13.0      |3     |Apple  |37                |
|4      |22 |2.5       |0     |Samsung|32                |
|4      |37 |16.5      |4     |Apple  |47                |
|5      |27 |9.0       |1     |MI     |37                |
|4      |27 |9.0       |0     |Oppo   |37                |
|5      |37 |23.0      |5     |Vivo   |47                |
|5      |37 |23.0      |5     |Samsung|47                |
|3      |22 |2.5       |0     |Apple  |32                |
|3      |27 |6.0       |0     |MI     |37                |
+-------+---+----------+------+-------+------------------+
only showing top 10 rows



In [12]:
df.show()

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
|      4| 37|      16.5|     4|  Apple|
|      5| 27|       9.0|     1|     MI|
|      4| 27|       9.0|     0|   Oppo|
|      5| 37|      23.0|     5|   Vivo|
|      5| 37|      23.0|     5|Samsung|
|      3| 22|       2.5|     0|  Apple|
|      3| 27|       6.0|     0|     MI|
|      2| 27|       6.0|     2|   Oppo|
|      5| 27|       6.0|     2|Samsung|
|      3| 37|      16.5|     5|  Apple|
|      5| 27|       6.0|     0|     MI|
|      4| 22|       6.0|     1|   Oppo|
|      4| 37|       9.0|     2|Samsung|
|      4| 27|       6.0|     1|  Apple|
|      1| 37|      23.0|     5|     MI|
|      2| 42|      23.0|     2|   Oppo|
|      4| 37|       6.0|     0|   Vivo|
+-------+---+----------+------+-------+
only showing top 20 rows



In [13]:
df_test = df.withColumn('age_after_10_years',(df['age'] + 10))
df_test.show()

+-------+---+----------+------+-------+------------------+
|ratings|age|experience|family| mobile|age_after_10_years|
+-------+---+----------+------+-------+------------------+
|      3| 32|       9.0|     3|   Vivo|                42|
|      3| 27|      13.0|     3|  Apple|                37|
|      4| 22|       2.5|     0|Samsung|                32|
|      4| 37|      16.5|     4|  Apple|                47|
|      5| 27|       9.0|     1|     MI|                37|
|      4| 27|       9.0|     0|   Oppo|                37|
|      5| 37|      23.0|     5|   Vivo|                47|
|      5| 37|      23.0|     5|Samsung|                47|
|      3| 22|       2.5|     0|  Apple|                32|
|      3| 27|       6.0|     0|     MI|                37|
|      2| 27|       6.0|     2|   Oppo|                37|
|      5| 27|       6.0|     2|Samsung|                37|
|      3| 37|      16.5|     5|  Apple|                47|
|      5| 27|       6.0|     0|     MI|                3

In [14]:
from pyspark.sql.types import StringType, DoubleType
df.withColumn('age_double', df['age'].cast(DoubleType())).show(10,False)

+-------+---+----------+------+-------+----------+
|ratings|age|experience|family|mobile |age_double|
+-------+---+----------+------+-------+----------+
|3      |32 |9.0       |3     |Vivo   |32.0      |
|3      |27 |13.0      |3     |Apple  |27.0      |
|4      |22 |2.5       |0     |Samsung|22.0      |
|4      |37 |16.5      |4     |Apple  |37.0      |
|5      |27 |9.0       |1     |MI     |27.0      |
|4      |27 |9.0       |0     |Oppo   |27.0      |
|5      |37 |23.0      |5     |Vivo   |37.0      |
|5      |37 |23.0      |5     |Samsung|37.0      |
|3      |22 |2.5       |0     |Apple  |22.0      |
|3      |27 |6.0       |0     |MI     |27.0      |
+-------+---+----------+------+-------+----------+
only showing top 10 rows



## FILTERING DATA

In [15]:
df.filter(df['mobile'] == 'Vivo').show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      3| 32|       9.0|     3|  Vivo|
|      5| 37|      23.0|     5|  Vivo|
|      4| 37|       6.0|     0|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
|      4| 37|       6.0|     0|  Vivo|
+-------+---+----------+------+------+



In [16]:
df.filter(df['mobile'] == 'Vivo').select('age','ratings','mobile').show()

+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32|      3|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
+---+-------+------+



In [17]:
df.filter(df['mobile'] == 'Vivo').filter(df['experience'] > 10).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+



In [18]:
df.filter((df['mobile'] == 'Vivo')&(df['experience'] > 10)).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+



## DISTINCT VALUES IN COLUMN

In [19]:
df.select('mobile').distinct().show()

+-------+
| mobile|
+-------+
|     MI|
|   Oppo|
|Samsung|
|   Vivo|
|  Apple|
+-------+



In [20]:
df.select('mobile').distinct().count()

5

## GROUPING DATA

In [21]:
df.groupBy('mobile').count().show(5,False)

+-------+-----+
|mobile |count|
+-------+-----+
|MI     |8    |
|Oppo   |7    |
|Samsung|6    |
|Vivo   |5    |
|Apple  |7    |
+-------+-----+



In [22]:
df.groupBy('mobile').count().orderBy('count', ascending = False).show(5, False)

+-------+-----+
|mobile |count|
+-------+-----+
|MI     |8    |
|Oppo   |7    |
|Apple  |7    |
|Samsung|6    |
|Vivo   |5    |
+-------+-----+



In [23]:
df.groupBy('mobile').mean().show(5,False)

+-------+------------------+------------------+------------------+------------------+
|mobile |avg(ratings)      |avg(age)          |avg(experience)   |avg(family)       |
+-------+------------------+------------------+------------------+------------------+
|MI     |3.5               |30.125            |10.1875           |1.375             |
|Oppo   |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286|
|Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333|
|Vivo   |4.2               |36.0              |11.4              |1.8               |
|Apple  |3.4285714285714284|30.571428571428573|11.0              |2.7142857142857144|
+-------+------------------+------------------+------------------+------------------+



In [24]:
df.groupBy('mobile').sum().show(5,False)

+-------+------------+--------+---------------+-----------+
|mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)|
+-------+------------+--------+---------------+-----------+
|MI     |28          |241     |81.5           |11         |
|Oppo   |20          |199     |72.5           |10         |
|Samsung|25          |172     |52.0           |11         |
|Vivo   |21          |180     |57.0           |9          |
|Apple  |24          |214     |77.0           |19         |
+-------+------------+--------+---------------+-----------+



In [25]:
df.groupBy('mobile').max().show(5,False)

+-------+------------+--------+---------------+-----------+
|mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|MI     |5           |42      |23.0           |5          |
|Oppo   |4           |42      |23.0           |2          |
|Samsung|5           |37      |23.0           |5          |
|Vivo   |5           |37      |23.0           |5          |
|Apple  |4           |37      |16.5           |5          |
+-------+------------+--------+---------------+-----------+



In [26]:
df.groupBy('mobile').min().show(5,False)

+-------+------------+--------+---------------+-----------+
|mobile |min(ratings)|min(age)|min(experience)|min(family)|
+-------+------------+--------+---------------+-----------+
|MI     |1           |27      |2.5            |0          |
|Oppo   |2           |22      |6.0            |0          |
|Samsung|2           |22      |2.5            |0          |
|Vivo   |3           |32      |6.0            |0          |
|Apple  |3           |22      |2.5            |0          |
+-------+------------+--------+---------------+-----------+



## AGGREGATIONS

In [27]:
df.groupBy('mobile').agg({'experience': 'sum'}).show(5,False)

+-------+---------------+
|mobile |sum(experience)|
+-------+---------------+
|MI     |81.5           |
|Oppo   |72.5           |
|Samsung|52.0           |
|Vivo   |57.0           |
|Apple  |77.0           |
+-------+---------------+



## USER-DEFINED FUNCTIONS (UDF's)

* Conventional UDF
* Pandas UDF

Pandas UDF are much more powerful in terms of speed and processing time.

In [28]:
from pyspark.sql.functions import udf

## CONVENTIONAL UDF: USING TRADITIONAL PYTHON FUNCTION

In [29]:
def price_range(brand):
    if brand in ['Smasung','Apple']:
        return'High Price'
    elif brand == 'MI':
        return 'Mid Price'
    else:
        return 'Low Price'
    
brand_udf = udf(price_range,StringType())

In [30]:
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

[Stage 51:>                                                         (0 + 1) / 1]

+-------+---+----------+------+-------+-----------+
|ratings|age|experience|family|mobile |price_range|
+-------+---+----------+------+-------+-----------+
|3      |32 |9.0       |3     |Vivo   |Low Price  |
|3      |27 |13.0      |3     |Apple  |High Price |
|4      |22 |2.5       |0     |Samsung|Low Price  |
|4      |37 |16.5      |4     |Apple  |High Price |
|5      |27 |9.0       |1     |MI     |Mid Price  |
|4      |27 |9.0       |0     |Oppo   |Low Price  |
|5      |37 |23.0      |5     |Vivo   |Low Price  |
|5      |37 |23.0      |5     |Samsung|Low Price  |
|3      |22 |2.5       |0     |Apple  |High Price |
|3      |27 |6.0       |0     |MI     |Mid Price  |
+-------+---+----------+------+-------+-----------+
only showing top 10 rows



                                                                                

## CONVENTIONAL UDF: USING LAMBDA FUNCTION

In [31]:
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
df.withColumn("age_group", age_udf(df.age)).show(10,False)

+-------+---+----------+------+-------+---------+
|ratings|age|experience|family|mobile |age_group|
+-------+---+----------+------+-------+---------+
|3      |32 |9.0       |3     |Vivo   |senior   |
|3      |27 |13.0      |3     |Apple  |young    |
|4      |22 |2.5       |0     |Samsung|young    |
|4      |37 |16.5      |4     |Apple  |senior   |
|5      |27 |9.0       |1     |MI     |young    |
|4      |27 |9.0       |0     |Oppo   |young    |
|5      |37 |23.0      |5     |Vivo   |senior   |
|5      |37 |23.0      |5     |Samsung|senior   |
|3      |22 |2.5       |0     |Apple  |young    |
|3      |27 |6.0       |0     |MI     |young    |
+-------+---+----------+------+-------+---------+
only showing top 10 rows



## PANDAS UDF (VECTORIZED UDF)

There are two types of Pandas UDFs:

* Scalar
* GroupedMap

In [32]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType

def remaining_yrs(age):
    yrs_left = (100 - age)
    return yrs_left

length_udf = pandas_udf(remaining_yrs, IntegerType())
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)

[Stage 53:>                                                         (0 + 1) / 1]

+-------+---+----------+------+-------+--------+
|ratings|age|experience|family|mobile |yrs_left|
+-------+---+----------+------+-------+--------+
|3      |32 |9.0       |3     |Vivo   |68      |
|3      |27 |13.0      |3     |Apple  |73      |
|4      |22 |2.5       |0     |Samsung|78      |
|4      |37 |16.5      |4     |Apple  |63      |
|5      |27 |9.0       |1     |MI     |73      |
|4      |27 |9.0       |0     |Oppo   |73      |
|5      |37 |23.0      |5     |Vivo   |63      |
|5      |37 |23.0      |5     |Samsung|63      |
|3      |22 |2.5       |0     |Apple  |78      |
|3      |27 |6.0       |0     |MI     |73      |
+-------+---+----------+------+-------+--------+
only showing top 10 rows



                                                                                

## PANDAS UDF (MULTIPLE COLUMNS)

In [33]:
def prod(rating,exp):
    x = rating * exp
    return x

prod_udf = pandas_udf(prod, DoubleType())

df.withColumn("product", prod_udf(df['ratings'],df['experience'])).show(10,False)

+-------+---+----------+------+-------+-------+
|ratings|age|experience|family|mobile |product|
+-------+---+----------+------+-------+-------+
|3      |32 |9.0       |3     |Vivo   |27.0   |
|3      |27 |13.0      |3     |Apple  |39.0   |
|4      |22 |2.5       |0     |Samsung|10.0   |
|4      |37 |16.5      |4     |Apple  |66.0   |
|5      |27 |9.0       |1     |MI     |45.0   |
|4      |27 |9.0       |0     |Oppo   |36.0   |
|5      |37 |23.0      |5     |Vivo   |115.0  |
|5      |37 |23.0      |5     |Samsung|115.0  |
|3      |22 |2.5       |0     |Apple  |7.5    |
|3      |27 |6.0       |0     |MI     |18.0   |
+-------+---+----------+------+-------+-------+
only showing top 10 rows



                                                                                

## DROP DUPLICATE VALUES

In [34]:
df.count()

33

In [35]:
df = df.dropDuplicates()
df.count()

26

## DELETE COLUMN

In [36]:
df_new = df.drop('mobile')
df_new.show()

+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
|      3| 32|       9.0|     3|
|      4| 22|       2.5|     0|
|      5| 27|       6.0|     0|
|      4| 22|       6.0|     1|
|      3| 27|       6.0|     0|
|      2| 32|      16.5|     2|
|      4| 27|       9.0|     0|
|      2| 27|       9.0|     2|
|      3| 37|      16.5|     5|
|      4| 27|       6.0|     1|
|      5| 37|      23.0|     5|
|      2| 27|       6.0|     2|
|      4| 37|       6.0|     0|
|      5| 37|      23.0|     5|
|      4| 37|       9.0|     2|
|      5| 37|      13.0|     1|
|      5| 27|       2.5|     0|
|      3| 42|      23.0|     5|
|      5| 22|       2.5|     0|
|      1| 37|      23.0|     5|
+-------+---+----------+------+
only showing top 20 rows



## WRITTING DATA: CSV

In [37]:
pwd

'/Users/c105624/Desktop/JUPYTER_NOTEBOOK'

In [38]:
write_uri = '/Users/c105624/Desktop/JUPYTER_NOTEBOOK/df_csv'
df.coalesce(1).write.format("csv").option("header", True).save(write_uri)

## WRITTING DATA: PARQUET

In [39]:
parquet_uri = '/Users/c105624/Desktop/JUPYTER_NOTEBOOK/df_parquet'
df.coalesce(1).write.format('parquet').save(parquet_uri)

                                                                                