In [4]:
from pathlib import Path
import pandas as pd
import numpy as np

In [1]:
import findspark 
# Local Spark
findspark.init('/home/ada/anaconda3/lib/python3.8/site-packages/pyspark') 

# Cloudera cluster Spark
# findspark.init(spark_home='/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/')

In [2]:
# get pyspark shell sessions
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example_app').master('local[*]').getOrCreate()

In [3]:
# display tables in db
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [47]:
# reading dataset using spark
df = spark.read.option("header",True).format("csv").load('/home/ada/Downloads/dataset_price_personal_computers.csv')
df.show(5)

+---+-----+-----+---+---+------+---+-----+-------+---+-----+
|_c0|price|speed| hd|ram|screen| cd|multi|premium|ads|trend|
+---+-----+-----+---+---+------+---+-----+-------+---+-----+
|  1| 1499|   25| 80|  4|    14| no|   no|    yes| 94|    1|
|  2| 1795|   33| 85|  2|    14| no|   no|    yes| 94|    1|
|  3| 1595|   25|170|  4|    15| no|   no|    yes| 94|    1|
|  4| 1849|   25|170|  8|    14| no|   no|     no| 94|    1|
|  5| 3295|   33|340| 16|    14| no|   no|    yes| 94|    1|
+---+-----+-----+---+---+------+---+-----+-------+---+-----+
only showing top 5 rows



In [49]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+-----------------+------------------+----+-----+-------+-----------------+-----------------+
|summary|               _c0|            price|             speed|                hd|              ram|            screen|  cd|multi|premium|              ads|            trend|
+-------+------------------+-----------------+------------------+------------------+-----------------+------------------+----+-----+-------+-----------------+-----------------+
|  count|              6259|             6259|              6259|              6259|             6259|              6259|6259| 6259|   6259|             6259|             6259|
|   mean|            3130.0|2219.576609682058|52.011024125259624|416.60169356127176|8.286946796612877|14.608723438248921|null| null|   null|221.3010065505672|15.92698514139639|
| stddev|1806.9619992314908|580.8039556527061|21.157735384308484| 258.5484451731357|5.631098924402045|0.90511522640

In [14]:
# from pandas to spark (saving table in spark)
machines = pd.read_csv('/home/ada/Downloads/dataset_price_personal_computers.csv')
machines.head()

Unnamed: 0.1,Unnamed: 0,price,speed,hd,ram,screen,cd,multi,premium,ads,trend
0,1,1499,25,80,4,14,no,no,yes,94,1
1,2,1795,33,85,2,14,no,no,yes,94,1
2,3,1595,25,170,4,15,no,no,yes,94,1
3,4,1849,25,170,8,14,no,no,no,94,1
4,5,3295,33,340,16,14,no,no,yes,94,1


In [15]:
machines.reset_index(inplace=True)
machines_spark = spark.createDataFrame(machines)
machines_spark.dtypes

[('index', 'bigint'),
 ('Unnamed: 0', 'bigint'),
 ('price', 'bigint'),
 ('speed', 'bigint'),
 ('hd', 'bigint'),
 ('ram', 'bigint'),
 ('screen', 'bigint'),
 ('cd', 'string'),
 ('multi', 'string'),
 ('premium', 'string'),
 ('ads', 'bigint'),
 ('trend', 'bigint')]

In [16]:
machines_spark.select('speed', 'price').show(5)

+-----+-----+
|speed|price|
+-----+-----+
|   25| 1499|
|   33| 1795|
|   25| 1595|
|   25| 1849|
|   33| 3295|
+-----+-----+
only showing top 5 rows



#### Bits of analysis with spark

In [17]:
machines_spark.count()

6259

In [19]:
# column details
machines_spark.printSchema()

root
 |-- index: long (nullable = true)
 |-- Unnamed: 0: long (nullable = true)
 |-- price: long (nullable = true)
 |-- speed: long (nullable = true)
 |-- hd: long (nullable = true)
 |-- ram: long (nullable = true)
 |-- screen: long (nullable = true)
 |-- cd: string (nullable = true)
 |-- multi: string (nullable = true)
 |-- premium: string (nullable = true)
 |-- ads: long (nullable = true)
 |-- trend: long (nullable = true)



In [28]:
# querying
machines_spark.filter(machines_spark["premium"] == "yes").show(5)

+-----+----------+-----+-----+---+---+------+---+-----+-------+---+-----+
|index|Unnamed: 0|price|speed| hd|ram|screen| cd|multi|premium|ads|trend|
+-----+----------+-----+-----+---+---+------+---+-----+-------+---+-----+
|    0|         1| 1499|   25| 80|  4|    14| no|   no|    yes| 94|    1|
|    1|         2| 1795|   33| 85|  2|    14| no|   no|    yes| 94|    1|
|    2|         3| 1595|   25|170|  4|    15| no|   no|    yes| 94|    1|
|    4|         5| 3295|   33|340| 16|    14| no|   no|    yes| 94|    1|
|    5|         6| 3695|   66|340| 16|    14| no|   no|    yes| 94|    1|
+-----+----------+-----+-----+---+---+------+---+-----+-------+---+-----+
only showing top 5 rows



In [48]:
# descriptive statistics
machines_spark.describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+----+-----+-------+-----------------+-----------------+
|summary|             index|        Unnamed: 0|            price|             speed|                hd|              ram|            screen|  cd|multi|premium|              ads|            trend|
+-------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+----+-----+-------+-----------------+-----------------+
|  count|              6259|              6259|             6259|              6259|              6259|             6259|              6259|6259| 6259|   6259|             6259|             6259|
|   mean|            3129.0|            3130.0|2219.576609682058|52.011024125259624|416.60169356127176|8.286946796612877|14.608723438248921|null| null|   null|221.3010065505672|15.92698514139639|
| stddev|1806.961999

#### Visualisation in spark

In [34]:
# premium price
from pyspark.sql.functions import avg
pp = df.groupby("premium").agg(avg("price"))
display(pp)

# price
display(df[['price']])

# hd price
from pyspark.sql.functions import avg
hp = df[["hd", "price"]]
display(hp)

# ram price
rp = df[["ram", "price"]]
display(rp)

# speed price
sp = df[["speed", "price"]]
display(sp)

# screen price
sp = df[["screen", "price"]]
display(sp)

# cd price
cp = df.groupby("cd").agg(avg("price"))
display(cp)

# multi price
mp = df.groupby("multi").agg(avg("price"))
display(mp)

DataFrame[premium: string, avg(price): double]

DataFrame[hd: bigint, price: bigint]

DataFrame[ram: bigint, price: bigint]

DataFrame[speed: bigint, price: bigint]

DataFrame[screen: bigint, price: bigint]

DataFrame[cd: string, avg(price): double]

DataFrame[multi: string, avg(price): double]

### Hive intergration
> To persist a Spark DataFrame into HDFS, where it can be queried using default Hadoop SQL engine (Hive), one straightforward strategy (not the only one) is to create a temporal view from that DataFrame:

In [35]:
df_hdfs.createOrReplaceTempView("df")

> Once the temporal view is created, it can be used from Spark SQL engine to create a real table using create table as select. Before creating this table, I will create a new database called analytics to store it:

In [37]:
# hive sql queries
# droping table
sql_drop_table = """
drop table if exists analytics.pandas_spark_hive
"""

# droping database
sql_drop_database = """
drop database if exists analytics cascade
"""

# creating database
sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""

# creating table
sql_create_table = """
create table if not exists analytics.pandas_spark_hive
using parquet
as select price as price, *
from machines_spark
"""

print("dropping database...")
result_drop_db = spark.sql(sql_drop_database)

print("creating database...")
result_create_db = spark.sql(sql_create_database)

print("dropping table...")
result_droptable = spark.sql(sql_drop_table)

print("creating table...")
result_create_table = spark.sql(sql_create_table)

### Apache Arrow (PyArrow) and HDFS

> Apache Arrow, is a in-memory columnar data format created to support high performance operations in Big Data environments (it can be seen as  the parquet format in-memory equivalent). It is developed in C++, but  its Python API is amazing as you will be able to see now, but first of all, please install it:

In [10]:
!conda install pyarrow -y

Collecting package metadata (current_repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /home/ada/anaconda3

  added / updated specs:
    - pyarrow


The following packages will be SUPERSEDED by a higher-priority channel:

  conda              conda-forge::conda-4.9.2-py38h578d9bd~ --> pkgs/main::conda-4.9.2-py38h06a4308_0


Preparing transaction: done
Verifying transaction: done
Executing transaction: done


In [14]:
!conda install libhdfs3 pyarrow

Collecting package metadata (current_repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Solving environment: failed with repodata from current_repodata.json, will retry with next repodata source.
Collecting package metadata (repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Solving environment: - 
Found conflicts! Looking for incompatible packages.
This can take several minutes.  Press CTRL-C to abort.
Examining conflict for python-libarchive-c pyyaml wurlitzer sphinxcontrib-jsmat| ^C
                                                                               failed

CondaError: KeyboardInterrupt



In [45]:
import pyarrow as pa
import os
os.environ['ARROW_LIBHDFS_DIR'] = '/opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/lib64/'
hdfs_interface = pa.hdfs.connect(host='localhost', port=8020, user='cloudera')

In [None]:
# list files in hdfs
hdfs_interface.ls('/user/cloudera/analytics/pandas_spark_hive/')

In [None]:
# reading files directly from hdfs
table = hdfs_interface.read_parquet('/user/cloudera/analytics/pandas_spark_hive/')

In [None]:
# uploading local files to hdfs
cwd = Path('./data/')
destination_path = '/user/cloudera/analytics/data/'

for f in cwd.rglob('*.*'):
    print(f'uploading {f.name}')
    with open(str(f), 'rb') as f_upl:
        hdfs_interface.upload(destination_path + f.name, f_upl)

### impyla: Hive + Impala SQL

In [39]:
!conda install impyla thrift_sasl -y

Collecting package metadata (current_repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /home/ada/anaconda3

  added / updated specs:
    - impyla
    - thrift_sasl


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    cyrus-sasl-2.1.27          |       hf484d3e_7         276 KB
    impyla-0.16.3              |           py38_0         357 KB
    libdb-6.1.26               |       he6710b0_0        17.7 MB
    sasl-0.2.1                 |   py38h779454e_1          58 KB
    thrift-0.13.0              |   py38he6710b0_0         120 KB
    thrift_sasl-0.4.2          |           py38_1          10 KB
    thriftpy-0.3.9             |   py38h7b6447c_2         199 KB
    ------------------------------------------------------------
                                           Total:        18.7 MB

The following NEW packages will be INSTALLED:

  cyrus-sasl   

In [41]:
!pip install thriftpy2

Collecting thriftpy2
  Downloading thriftpy2-0.4.12.tar.gz (356 kB)
[K     |████████████████████████████████| 356 kB 58 kB/s eta 0:00:011
Building wheels for collected packages: thriftpy2
  Building wheel for thriftpy2 (setup.py) ... [?25ldone
[?25h  Created wheel for thriftpy2: filename=thriftpy2-0.4.12-cp38-cp38-linux_x86_64.whl size=1171548 sha256=9ecb423b76170250ee6dcd73085a136635fb211d92406834b435ccc487bc2033
  Stored in directory: /home/ada/.cache/pip/wheels/ad/a6/d0/c948df29931021b048a99d5ab2bc46d5f348b657342f23c075
Successfully built thriftpy2
Installing collected packages: thriftpy2
Successfully installed thriftpy2-0.4.12


In [42]:
from impala.dbapi import connect
from impala.util import as_pandas

In [44]:
# from hive to pandas
hive_conn = connect(host='localhost', port=10000, database='analytics', auth_mechanism='PLAIN')

with hive_conn.cursor() as c:
    c.execute('SELECT * FROM analytics.pandas_spark_hive LIMIT 100')
    results = c.fetchall()
    
with hive_conn.cursor() as c:
    c.execute('SELECT * FROM analytics.pandas_spark_hive LIMIT 100')
    results_df = as_pandas(c)