In [1]:
from pyspark.sql import SparkSession
import findspark
import pyarrow.csv as pv
import pyarrow.parquet as pq
import pandas as pd
findspark.find()
findspark.init()
spark=SparkSession.builder.appName('spark_parquet_example').getOrCreate()
spark

In [2]:
df2=pv.read_csv(r'C:\Users\venka\My Python Work\Pyspark\10000 Sales Records.csv')
pq.write_table(df2,"file.parquet")

In [3]:
data=spark.read.parquet('file.parquet')

%timeit data['Region',].distinct().show()

In [4]:
df=data.toPandas()

df

%timeit df['Region'].unique()

In [5]:
df['Region'].unique()

array(['Sub-Saharan Africa', 'Europe', 'Middle East and North Africa',
       'Asia', 'Central America and the Caribbean',
       'Australia and Oceania', 'North America'], dtype=object)

In [6]:
data['Region',].show()

+--------------------+
|              Region|
+--------------------+
|  Sub-Saharan Africa|
|              Europe|
|Middle East and N...|
|  Sub-Saharan Africa|
|              Europe|
|  Sub-Saharan Africa|
|                Asia|
|                Asia|
|  Sub-Saharan Africa|
|Central America a...|
|  Sub-Saharan Africa|
|              Europe|
|              Europe|
|                Asia|
|Middle East and N...|
|Australia and Oce...|
|Central America a...|
|              Europe|
|Middle East and N...|
|              Europe|
+--------------------+
only showing top 20 rows



data.filter(['Region'=='Asia'])

In [7]:
data.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_ID: long (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Units_Sold: long (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Unit_Cost: double (nullable = true)
 |-- Total_Revenue: double (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Total_Profit: double (nullable = true)



In [8]:
data.createOrReplaceTempView("books_table")

In [9]:
def fetch(query):
    return spark.sql(query)

In [10]:
data_sql=spark.sql("select Region,count(*) from books_table group by Region ")

In [11]:
data_sql.show()

+--------------------+--------+
|              Region|count(1)|
+--------------------+--------+
|Middle East and N...|    1264|
|Australia and Oce...|     797|
|              Europe|    2633|
|  Sub-Saharan Africa|    2603|
|Central America a...|    1019|
|       North America|     215|
|                Asia|    1469|
+--------------------+--------+



In [12]:
query="select Region,count(*) from books_table group by Region "
out=fetch(query)
out.show()

+--------------------+--------+
|              Region|count(1)|
+--------------------+--------+
|Middle East and N...|    1264|
|Australia and Oce...|     797|
|              Europe|    2633|
|  Sub-Saharan Africa|    2603|
|Central America a...|    1019|
|       North America|     215|
|                Asia|    1469|
+--------------------+--------+



In [13]:
query="select Region,count(*) from books_table group by Region order by Region asc "
out=fetch(query)
out.show()

+--------------------+--------+
|              Region|count(1)|
+--------------------+--------+
|                Asia|    1469|
|Australia and Oce...|     797|
|Central America a...|    1019|
|              Europe|    2633|
|Middle East and N...|    1264|
|       North America|     215|
|  Sub-Saharan Africa|    2603|
+--------------------+--------+



In [14]:
query="""select Region,count(*), row_number() over(order by Region) as row 
        from books_table 
        group by Region 
        order by Region desc"""
out=fetch(query)
out.show()

+--------------------+--------+---+
|              Region|count(1)|row|
+--------------------+--------+---+
|  Sub-Saharan Africa|    2603|  7|
|       North America|     215|  6|
|Middle East and N...|    1264|  5|
|              Europe|    2633|  4|
|Central America a...|    1019|  3|
|Australia and Oce...|     797|  2|
|                Asia|    1469|  1|
+--------------------+--------+---+



In [15]:
out_data=out.tail(1)

In [16]:
type(out_data)

list

In [17]:
for i in out_data:
    print(i['Region'])

Asia


In [18]:
query="""select Region,round(sum(Total_Revenue)/1000000,3) as Total_Revenue_in_Billion
        from books_table
        group by Region
        order by Total_Revenue_in_Billion desc"""
out=fetch(query)
out.show()

+--------------------+------------------------+
|              Region|Total_Revenue_in_Billion|
+--------------------+------------------------+
|              Europe|                3481.372|
|  Sub-Saharan Africa|                3350.556|
|                Asia|                2005.306|
|Middle East and N...|                1715.633|
|Central America a...|                 1395.53|
|Australia and Oce...|                1049.802|
|       North America|                 335.352|
+--------------------+------------------------+



from pyspark.sql import functions
out1=out.withColumn("id", functions.monotonically_increasing_id())

In [19]:
out.show()

+--------------------+------------------------+
|              Region|Total_Revenue_in_Billion|
+--------------------+------------------------+
|              Europe|                3481.372|
|  Sub-Saharan Africa|                3350.556|
|                Asia|                2005.306|
|Middle East and N...|                1715.633|
|Central America a...|                 1395.53|
|Australia and Oce...|                1049.802|
|       North America|                 335.352|
+--------------------+------------------------+



In [20]:
out.createOrReplaceTempView("Region_Revenue")

In [21]:
query="""select Region,Total_Revenue_in_Billion,row_number() over(order by Total_Revenue_in_Billion desc ) as row
    from Region_Revenue
    limit 5
    """
out=fetch(query)
out.show()

+--------------------+------------------------+---+
|              Region|Total_Revenue_in_Billion|row|
+--------------------+------------------------+---+
|              Europe|                3481.372|  1|
|  Sub-Saharan Africa|                3350.556|  2|
|                Asia|                2005.306|  3|
|Middle East and N...|                1715.633|  4|
|Central America a...|                 1395.53|  5|
+--------------------+------------------------+---+



In [22]:
out.select(['Region','Total_Revenue_in_Billion']).filter(out['row'] >2).show()

+--------------------+------------------------+
|              Region|Total_Revenue_in_Billion|
+--------------------+------------------------+
|                Asia|                2005.306|
|Middle East and N...|                1715.633|
|Central America a...|                 1395.53|
+--------------------+------------------------+



In [23]:
from glob import glob
glob('*.parquet')

['file.parquet', 'filefrompandas.parquet']

In [43]:
data.filter(data['Region']=='Asia').show(5)

+------+----------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region|   Country|    Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+------+----------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Asia|      Laos|   Vegetables|       Online|             L| 2/20/2011|844532620| 3/20/2011|      4825|    154.06|    90.93|     743339.5| 438737.25|   304602.25|
|  Asia|     China|    Baby Food|       Online|             C|04-10-2017|564251220|05-12-2017|      3330|    255.28|   159.42|     850082.4|  530868.6|    319213.8|
|  Asia|     India|    Household|       Online|             C| 8/27/2016|151717174|09-02-2016|      5338|    668.27|   502.54|   3567225.26|2682558.52|   884666.74|
|  Asia|  

In [65]:
out2=data['Region',].distinct().show(data.count(),False)
out2

+---------------------------------+
|Region                           |
+---------------------------------+
|Middle East and North Africa     |
|Australia and Oceania            |
|Europe                           |
|Sub-Saharan Africa               |
|Central America and the Caribbean|
|North America                    |
|Asia                             |
+---------------------------------+



In [63]:
#pq.write_table(data.filter(data['Region']=='Asia'),"Asia.parquet")
type(out2)

NoneType