In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=30c500897488b80aa4cb5771ab4d57ee359d2fc8c5794893fcd27207554ca336
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


PySpark require a session to run clusters

In [None]:

#Import the spark session builder
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import *

spark = SparkSession.builder \
      .appName("MySparkSession") \
      .master("local[2]") \
      .getOrCreate()

In [None]:
print(spark.version)

3.5.0


###Basic of PySpark

In [None]:
# Reading a file
prev = spark.read.csv("block_1.csv", header=True, inferSchema=True)

# Rename the columns using toDF and assign the result to a new DataFrame
prev_renamed = prev.toDF("col1", "col2", "CMPfirstnameFIRST", "CMPfirstnameFIRST", "CMPlastnameFIRST", "CMPlastnameLAST", "sex", "col8", "col9", "col10", "col11", "col12")

# Show the original DataFrame
print("Original DataFrame:")
prev.show()

# Show the DataFrame with renamed columns
print("\nDataFrame with Renamed Columns:")
prev_renamed.show()


Original DataFrame:
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|           ?|         1.0|           ?|      1|     1|     1|     1|      0|    true|
|39086|47614|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|70031|70237|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|84795|97439|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|36950|42116|                1|           ?|         1.0|           1|      1|     1|     1|     1|      1|    true|
|42413|48491|                1|           ?|

### Analyzing Data with the DataFrame API

In [None]:
prev.printSchema() #prev.info() in pandas

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)
 |-- is_match: boolean (nullable = true)



In [None]:
prev.show(5)

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|           ?|         1.0|           ?|      1|     1|     1|     1|      0|    true|
|39086|47614|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|70031|70237|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|84795|97439|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|36950|42116|                1|           ?|         1.0|           1|      1|     1|     1|     1|      1|    true|
+-----+-----+-----------------+------------+------------+-------

You access the methods of the DataFrameReader API by calling the read method on a SparkSession instance, and you can load data from a file using either the format and load methods or one of the shortcut methods for built-in formats:

In [None]:
d1 = spark.read.format("json").load("file.json")
d2 = spark.read.json("file.json")

AnalysisException: ignored

The main difference lies in the level of control and customization. If you have specific requirements or need to set detailed options for reading JSON files, you might prefer using the format("json").load("file.json") approach.

On the other hand, if you have a standard JSON file and want a concise and simple way to read it into a DataFrame, the read.json("file.json") method is a convenient shortcut.

In [None]:
#counting the number of row
prev.count()

In [None]:
#After schema inference, only what specified can run on the infered datatype
#so if we want to save that datatypes, we must save it to cache
prev.cache()

In [None]:
prev.groupBy("is_match").count().orderBy(col("count").desc()).show()

"""
SELECT is_match, COUNT(*) AS count
    FROM prev
    GROUP BY is_match
    ORDER BY count DESC

for SparkSQL
"""

###DataFrame Aggregation functions

In [None]:
prev.printSchema()

In [None]:
#print out the average and standard deviation
prev.agg(avg("cmp_sex"), stddev("cmp_sex")).show()

In [None]:
# Basic aggregate functions

# Print the number of row in the dataset
print("Count: ", prev.select(count("*").alias("row_count")).first()["row_count"])

# Print the sum of the 'cmp_sex' column
print("Sum:", prev.select(sum("cmp_sex").alias("total_sex")).first()["total_sex"])

# Print the average of the 'cmp_lname_c1' column
print("Average:", prev.select(avg("cmp_lname_c1").alias("avg_cmp_lname_c1")).first()["avg_cmp_lname_c1"])

# Print the maximum and minimum values of the 'cmp_bd' column
max_min = prev.select(max("cmp_bd").alias("max_cmp_bd"), min("cmp_bd").alias("min_cmp_bd")).first()
print("Max and Min:", max_min["max_cmp_bd"], max_min["min_cmp_bd"])

# Print the distinct values in the 'cmp_plz' column
print("Distinct Values:", prev.select("cmp_plz").distinct().rdd.flatMap(lambda x: x).collect())

# Print the standard deviation of the 'cmp_lname_c1' column
print("Standard Deviation:", prev.select(stddev("cmp_lname_c1").alias("stddev_cmp_lname_c1")).first()["stddev_cmp_lname_c1"])

# Print the variance of the 'cmp_lname_c1' column
print("Variance:", prev.select(variance("cmp_lname_c1").alias("variance_cmp_lname_c1")).first()["variance_cmp_lname_c1"])

# Print the correlation between 'cmp_sex' and 'cmp_lname_c1'
print("Correlation:", prev.select(corr("cmp_sex", "cmp_lname_c1").alias("correlation")).first()["correlation"])

# Print the covariance between 'cmp_sex' and 'cmp_lname_c1'
print("Covariance:", prev.select(covar_samp("cmp_sex", "cmp_lname_c1").alias("covariance")).first()["covariance"])

# Print the first element of the 'cmp_plz' column
print("First Element:", prev.select(first("cmp_plz").alias("first_element")).first()["first_element"])

# Print the last element of the 'cmp_plz' column
print("Last Element:", prev.select(last("cmp_plz").alias("last_element")).first()["last_element"])


In [None]:
# Tabular format
prev.agg(
    avg("cmp_sex").alias("avg_cmp_sex"),
    stddev("cmp_sex").alias("stddev_cmp_sex"),
    max("cmp_sex").alias("max_cmp_sex"),
    min("cmp_sex").alias("min_cmp_sex"),
    variance("cmp_sex").alias("variance_cmp_sex"),
    corr("cmp_sex", "cmp_lname_c1").alias("correlation_cmp_sex"),
    covar_samp("cmp_sex", "cmp_lname_c1").alias("covariance_cmp_sex"),
    first("cmp_sex").alias("first_cmp_sex"),
    last("cmp_sex").alias("last_cmp_sex")
).show()


You have the option of running Spark either by using an ANSI 2003-compliant version of Spark SQL (the default) or in HiveQL mode by calling the enableHiveSupport method when you create a SparkSession instance via its Builder API.

In [None]:
# This method allow us to create a new view named inside the parameter and use SQL inside the
# python code and can be call upon
prev.createOrReplaceTempView("linkage")

In [None]:
# Example

spark.sql("""
  Select id_1, is_match
  From linkage
""").show()

# You can use all kind of SQL queries using SparkSQL

You can connect to a Hive metastore via a hive-site.xml file, and you can also use HiveQL in queries by calling the enableHiveSupport method on the SparkSession Builder API. However, using SparkSQL will allow the application to handle data more fluently between spark environment



In [None]:
spark_session = SparkSession.builder.master("local[4]").\
                  enableHiveSupport().getOrCreate()

###Fast summary statistics for DataFrames

Although there are many kinds of analyses that may be expressed equally well in SQL or with the DataFrame API, there are certain common things that we want to be able to do with dataframes that can be tedious to express in SQL. One such analysis that is especially helpful is computing the min, max, mean, and standard deviation of all the non-null values in the numerical columns of a dataframe. In PySpark, this function has the same name that it does in pandas

In [None]:
# The .describe() function in PySpark provides summary statistics for numeric
# columns in a DataFrame, including count, mean, standard deviation, minimum, and
# maximum values. It helps to quickly analyze the distribution and characteristics
# of numerical data.
summary = prev.describe()
summary.show()

print("Select specific column")
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

+-------+------------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+
|summary|              id_1|             id_2|      cmp_fname_c1|      cmp_fname_c2|       cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|             cmp_bm|             cmp_by|             cmp_plz|
+-------+------------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+
|  count|            574913|           574913|            574913|            574913|             574913|             574913|             574913|             574913|             574913|             574913|              574913|
|   mean|33271.962171667714| 66564.6636865056|0.7127592938252765|0.8977586763518972|0.3155724578

In [None]:
# Data filtering

matches = prev.where("is_match = true")
match_summary = matches.describe().show()

misses = prev.filter(col("is_match") == False)
miss_summary = misses.describe().show()

+-------+------------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|summary|              id_1|             id_2|       cmp_fname_c1|       cmp_fname_c2|        cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|             cmp_bm|             cmp_by|            cmp_plz|
+-------+------------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|  count|              2093|             2093|               2093|               2093|                2093|               2093|               2093|               2093|               2093|               2093|               2093|
|   mean| 34440.86956521739|50889.32345914955| 0.9970329792424486| 0.9955357142857143|  

### Pivoting and reshaping DataFrames

We can transpose the DataFrames entirely using functions provided by PySpark. However, there is another way to perform this task. PySpark allows conversion between Spark and pandas DataFrames. We will convert the DataFrames in question into pandas DataFrames, reshape them, and convert them back to Spark DataFrames

In [None]:
summary_p = summary.toPandas()
summary_p.head()

summary_p.shape

(5, 12)

In [None]:
summary_p = summary_p.set_index('summary').transpose().reset_index()
...
summary_p = summary_p.rename(columns={'index':'field'})
...
summary_p = summary_p.rename_axis(None, axis=1)

TypeError: ignored

In [None]:
summaryT = spark.createDataFrame(summary_p)
...
summaryT.show()

+------------+------+--------------------+-------------------+---+------+
|       field| count|                mean|             stddev|min|   max|
+------------+------+--------------------+-------------------+---+------+
|        id_1|574913|  33271.962171667714|  23622.66942593358|  1| 99894|
|        id_2|574913|    66564.6636865056|  23642.00230967225|  6|100000|
|cmp_fname_c1|574913|  0.7127592938252765| 0.3889286452463553|  0|     ?|
|cmp_fname_c2|574913|  0.8977586763518972| 0.2742577520430534|  0|     ?|
|cmp_lname_c1|574913| 0.31557245780987964| 0.3342494687554251|0.0|   1.0|
|cmp_lname_c2|574913| 0.32691554145529067|0.37830920205406704|  0|     ?|
|     cmp_sex|574913|  0.9550923357099248|0.20710152240504406|  0|     1|
|      cmp_bd|574913| 0.22475563232907309| 0.4174216587235586|  0|     ?|
|      cmp_bm|574913|  0.4886361857246487|0.49987128182816276|  0|     ?|
|      cmp_by|574913| 0.22266639529199742|  0.416036504164562|  0|     ?|
|     cmp_plz|574913|0.005494946113964

### Joining DataFrame and selecting Features

we have used Spark SQL and the DataFrame API only to filter and aggregate the records from a dataset, but we can also use these tools to perform joins (inner, left outer, right outer, or full outer) on DataFrames. Although the DataFrame API includes a join function, it’s often easier to express these joins using Spark SQL, especially when the tables we are joining have a large number of column names in common and we want to be able to clearly indicate which column we are referring to in our select expressions

In [None]:
match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)

match_summaryT.createOrReplaceTempView("match_desc")
miss_summaryT.createOrReplaceTempView("miss_desc")
spark.sql("""
  SELECT a.field, a.count + b.count total, a.mean - b.mean delta
  FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
  WHERE a.field NOT IN ("id_1", "id_2")
  ORDER BY delta DESC, total DESC
""").show()

NameError: ignored