In [1]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

#from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
import collections
spark = SparkSession.builder.master("local").appName("Ingestion").config('spark.ui.port', '4050').getOrCreate()

In [2]:
# Read CSV file people.csv
df = spark.read.format('csv') \
                .option("inferSchema","true") \
                .option("header","true") \
                .option("sep",";") \
                .load("people.csv")

In [3]:
df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



In [4]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)



In [5]:
from pyspark.sql import Row

# assuming df is your DataFrame

new_row = Row(name="Zoey", age=31, job="AI Engineer")
new_df = df.union(df.sparkSession.createDataFrame([new_row]))

new_df.show()

+-----+---+-----------+
| name|age|        job|
+-----+---+-----------+
|Jorge| 30|  Developer|
|  Bob| 32|  Developer|
| Zoey| 31|AI Engineer|
+-----+---+-----------+



In [7]:
new_df.write.csv("people-v2.csv", header=True, mode="overwrite")

In [9]:
df_2 = spark.read.csv("people-v2.csv", header=True, inferSchema=True)

In [10]:
df_2.show()

+-----+---+-----------+
| name|age|        job|
+-----+---+-----------+
|Jorge| 30|  Developer|
|  Bob| 32|  Developer|
| Zoey| 31|AI Engineer|
+-----+---+-----------+



## Read JSON
There are two methods we can use

In [18]:
### Method 1 ###
# df_3 = spark.read.format('json') \
#                 .option("inferSchema","true") \
#                 .load("article.json")

### Method 2 ###
articleDF = spark.read.json("article.json")

In [20]:
articleDF.printSchema()

root
 |-- _corrupt_record: string (nullable = true)



In [46]:
# Need to parse the json file into python dictionary before creating a dataframe
import json

with open("article.json", "r") as file:
    my_json_file = json.load(file)

articleDF = spark.createDataFrame([my_json_file])

# Show the DataFrame
articleDF.show()



+--------------------+--------------------+--------------------+-----------+-------+--------------------+--------------------+
|                body|          final_tldr|            headline|isPublished|    nid|  summarization_jobs|                tldr|
+--------------------+--------------------+--------------------+-----------+-------+--------------------+--------------------+
| <p>COPENHAGEN: A...|<ul>\r\n\t<li>As ...|European countrie...|       true|4487741|{123 -> {start_ti...|[{start_time -> 2...|
+--------------------+--------------------+--------------------+-----------+-------+--------------------+--------------------+



In [49]:
articleDF.select("summarization_jobs", "tldr").show()

+--------------------+--------------------+
|  summarization_jobs|                tldr|
+--------------------+--------------------+
|{123 -> {start_ti...|[{start_time -> 2...|
+--------------------+--------------------+



In [51]:
from pyspark.sql.functions import map_entries

articleDF.select(map_entries(articleDF.summarization_jobs).alias("summarization_jobs_entries")).show()

+--------------------------+
|summarization_jobs_entries|
+--------------------------+
|      [{123, {start_tim...|
+--------------------------+



In [53]:
from pyspark.sql.functions import explode

articleDF.select(explode(articleDF.tldr).alias("tldr_elements")).show()

+--------------------+
|       tldr_elements|
+--------------------+
|{start_time -> 20...|
|{start_time -> 20...|
|{start_time -> 20...|
+--------------------+



## Parquet

### What is Parquet?
Parquet is a columnar storage file format optimized for analytical queries. It stores data by columns rather than rows, which allows for efficient data compression and faster query performance, especially when only a subset of columns is needed.

### Who should use Parquet?
Data engineers, data scientists, and analysts working with large datasets and requiring efficient storage and retrieval for analytical queries should consider using Parquet. It is particularly useful in big data environments and data lakes.

### When should you use Parquet?
- Large Datasets: When working with very large datasets that need to be stored and queried efficiently.
- Columnar Queries: When your queries frequently target specific columns rather than entire rows.
- Data Lakes: When storing data in a data lake for use by multiple analytics services.
- Schema Evolution: When you need flexibility to evolve the schema over time without significant overhead.

### Where is Parquet implemented?
Parquet is commonly used in cloud storage solutions (e.g., AWS S3, Google Cloud Storage, Azure Data Lake Storage), Hadoop Distributed File System (HDFS), and local file systems. It integrates well with various big data processing frameworks like Apache Spark, Hive, and Presto.

In [54]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
articleDF.write.format("parquet").mode("overwrite").save("article.parquet")


In [56]:
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("article.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
published_articles = spark.sql("SELECT nid FROM parquetFile WHERE isPublished = true")
published_articles.show()

+-------+
|    nid|
+-------+
|4487741|
+-------+



## What is PyArrow?
PyArrow is the Python implementation of Apache Arrow, a cross-language development platform for in-memory data. Apache Arrow provides a standardized columnar memory format optimized for analytical operations and efficient data interchange between different data processing frameworks. PyArrow offers Python bindings to the Arrow C++ libraries, enabling efficient data manipulation and interoperability with other big data tools.

### Who uses PyArrow?
PyArrow is primarily used by data scientists, data engineers, and developers who work with large datasets and require efficient data processing and interchange. It is particularly beneficial for those using Python-based data analysis libraries like NumPy and pandas, and for those working within big data ecosystems involving tools like Apache Spark.

### When should you use PyArrow?
PyArrow should be used in scenarios that involve:
- Large Datasets: When handling large volumes of data that need efficient storage and fast access.
- Data Interchange: When there is a need to transfer data efficiently between different systems or languages, such as between Python and JVM-based systems.
- Performance Optimization: When optimizing the performance of data processing tasks, especially in environments where pandas and NumPy are heavily used.
- In-Memory Analytics: When performing in-memory data analytics that benefit from a columnar data format.

### Where is PyArrow used?
PyArrow is used in various environments:
- Big Data Platforms: Integrated with tools like Apache Spark, Hadoop, and cloud data platforms (e.g., AWS S3, Google Cloud Storage, Azure Data Lake Storage).
- Local Systems: For in-memory data processing on local machines using Python.
-  Data Lakes: For efficient storage and retrieval of large datasets in data lakes.
- Cloud Environments: For data processing and analytics in cloud-based systems.

### Why use PyArrow?
There are several compelling reasons to use PyArrow:
- Performance: PyArrow provides significant performance improvements for data processing tasks by leveraging a columnar memory format and efficient data serialization.
- Interoperability: It enables seamless data interchange between different languages and frameworks, such as between Python (pandas, NumPy) and JVM-based systems (Spark).
- Memory Efficiency: By using a columnar format, PyArrow optimizes memory usage, which is particularly beneficial for large-scale data analytics.
- Integration: PyArrow integrates well with existing Python data analysis libraries and big data tools, making it easy to incorporate into existing workflows.
- Standardization: It provides a standardized way to handle in-memory data, facilitating better collaboration and consistency across different tools and platforms.

### How does PyArrow work?
PyArrow works by providing Python bindings to the Arrow C++ libraries, enabling efficient data manipulation and interchange. Here are some key functionalities:

- Columnar Memory Format: PyArrow uses Apache Arrow's columnar memory format, which organizes data by columns rather than rows, allowing for efficient compression and fast columnar access.
- Data Interchange: It facilitates efficient data transfer between Python and other systems by using Arrow's standardized format, reducing serialization overhead.
- Integration with Pandas: PyArrow can be used as a backend for pandas, allowing pandas DataFrames to store data in Arrow format, improving performance for certain operations.
- Support for Various File Systems: PyArrow supports reading and writing data to various file systems, including local storage, HDFS, and cloud storage like AWS S3.
- Optimized Data Processing: PyArrow can be used to optimize data processing tasks in Spark by enabling efficient data transfer between Python and JVM processes.


In [57]:
import pyarrow.csv as pv
import pyarrow.parquet as pq

hdb_table = pv.read_csv("resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.csv")

pq.write_table(hdb_table,'resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')

hdb_parquet = pq.ParquetFile('resale-flat-prices-based-on-registration-date-from-mar-2012-to-dec-2014.parquet')

print(hdb_parquet.metadata)

# inspect the parquet row group metadata
print(hdb_parquet.metadata.row_group(0))

# inspect the column chunk metadata
print(hdb_parquet.metadata.row_group(0).column(9).statistics)

<pyarrow._parquet.FileMetaData object at 0xffff5974c720>
  created_by: parquet-cpp-arrow version 13.0.0
  num_columns: 10
  num_rows: 52203
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2079
<pyarrow._parquet.RowGroupMetaData object at 0xffff5973c2c0>
  num_columns: 10
  num_rows: 52203
  total_byte_size: 431095
<pyarrow._parquet.Statistics object at 0xffff5991b790>
  has_min_max: True
  min: 195000.0
  max: 1088888.0
  null_count: 0
  distinct_count: 0
  num_values: 52203
  physical_type: DOUBLE
  logical_type: None
  converted_type (legacy): NONE


In [58]:
# Convert Spark to Pandas
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_df = df.toPandas()
pandas_df.head()

Unnamed: 0,name,age,job
0,Jorge,30,Developer
1,Bob,32,Developer


In [59]:
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

