## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/5000_Sales_Records.csv"

In [0]:
file_rdd = sc.textFile('/FileStore/tables/5000_Sales_Records.csv')
line_rdd = file_rdd.map(lambda x:x.split(',')) 
line_rdd = line_rdd.filter(lambda x:x[0]!='Region')
line_rdd.cache()

Out[15]: PythonRDD[105] at RDD at PythonRDD.scala:58

In [0]:
#Display the number of countries present in the data
total_countries = line_rdd.map(lambda x:x[1]).distinct().count()
total_countries

Out[16]: 185

In [0]:
#Display the number of units sold in each region
region_units_rdd = line_rdd.map(lambda x:(x[0],x[8])).reduceByKey(lambda a,b:int(a)+int(b))
region_units_rdd.collect()

Out[17]: [('Asia', 3620036),
 ('Middle East and North Africa', 3013431),
 ('Australia and Oceania', 2111786),
 ('Central America and the Caribbean', 2698776),
 ('Europe', 6582322),
 ('Sub-Saharan Africa', 6642380),
 ('North America', 484760)]

In [0]:
region_units_rdd.saveAsTextFile("/FileStore/tables/region_units")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1221725612017345>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mregion_units_rdd[0m[0;34m.[0m[0msaveAsTextFile[0m[0;34m([0m[0;34m"/FileStore/tables/region_units"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m       

In [0]:
#Display the 10 most recent sales
def date_format(date):
    split_date = date.split('/')
    if len(split_date[0]) == 1:
        split_date[0] = '0'+split_date[0]
    if len(split_date[1]) == 1:
        split_date[1] = '0'+split_date[1]
    date = split_date[2]+split_date[0]+split_date[1]
    return date



In [0]:
date_rdd = line_rdd.map(lambda x:(x,date_format(x[5])))
top_ten_rdd = date_rdd.sortBy(lambda x:x[1],ascending=False)
top_ten = top_ten_rdd.take(10)
top_ten

Out[19]: [(['Asia',
   'Bhutan',
   'Cereal',
   'Offline',
   'M',
   '7/28/2017',
   '223854434',
   '8/25/2017',
   '2356',
   '205.70',
   '117.11',
   '484629.20',
   '275911.16',
   '208718.04'],
  '20170728'),
 (['Sub-Saharan Africa',
   'Senegal',
   'Cosmetics',
   'Online',
   'C',
   '7/26/2017',
   '537970721',
   '8/18/2017',
   '6346',
   '437.20',
   '263.33',
   '2774471.20',
   '1671092.18',
   '1103379.02'],
  '20170726'),
 (['Middle East and North Africa',
   'United Arab Emirates',
   'Household',
   'Online',
   'C',
   '7/26/2017',
   '419542396',
   '8/8/2017',
   '773',
   '668.27',
   '502.54',
   '516572.71',
   '388463.42',
   '128109.29'],
  '20170726'),
 (['Australia and Oceania',
   'Australia',
   'Beverages',
   'Online',
   'L',
   '7/26/2017',
   '631485402',
   '8/12/2017',
   '9418',
   '47.45',
   '31.79',
   '446884.10',
   '299398.22',
   '147485.88'],
  '20170726'),
 (['Sub-Saharan Africa',
   "Cote d'Ivoire",
   'Vegetables',
   'Online',
   'H'

In [0]:
top_sales_rdd = sc.parallelize(top_ten,1)
top_sales_rdd.saveAsTextFile("/FileStore/tables/top_ten_sales")


In [0]:
#Display the products with atleast 2 occurences of 'a'
line_rdd.map(lambda x:x[2]).filter(lambda x:x.count('a')>=2).distinct().collect()

Out[21]: ['Personal Care']

In [0]:
# Display country in each region with highest units sold
country_units_rdd = line_rdd.map(lambda x:((x[0],x[1]),x[8]))
country_sum_units = country_units_rdd.reduceByKey(lambda a,b:a+b)
country_units_sorted = country_sum_units.sortBy(lambda x:x[1],ascending = False) 
highest_units = country_units_sorted.map(lambda x:(x[0][0],(x[0][1],x[1]))).reduceByKey(lambda a,b:max(a,b,key = lambda x:x[1])).sortBy(lambda x:x[1][1],ascending=False)
highest_units.collect()

Out[22]: [('Europe',
  ('Hungary',
   '9968685255409495918868287807650977858098555223418179519342848151145631269367410030867259717125482359')),
 ('Middle East and North Africa',
  ('Yemen',
   '9864651342995148153726825544529044120043263886753749842753643407375744675449414535679')),
 ('Sub-Saharan Africa',
  ('Comoros',
   '98595578898011747939866092753548308145113476528467725131710163526145760461934469467627752656715473')),
 ('Asia',
  ('Malaysia',
   '9762677520108255541437297276069260931366445983652672478740745477252117959149791419414824823722935512135664')),
 ('Australia and Oceania',
  ('Nauru',
   '95242003509883841579457261662995445297639971276350715065504301275875627769353735851472641770198471')),
 ('Central America and the Caribbean',
  ('El Salvador',
   '94163476954094446499296971532337976915463028965291736156619850823957698920291569587289821075997')),
 ('North America',
  ('Greenland',
   '9025989814153475796568345568134815983289659025770741064455305097972312557028192708730

In [0]:
#Display the unit price and unit cost of each item in ascending order
data_rdd = line_rdd.map(lambda x:(x[2],float(x[9]),float(x[10])))
product_price_cost = data_rdd.distinct().sortBy(lambda x:(x[1],x[2]))
product_price_cost.collect()

Out[23]: [('Fruits', 9.33, 6.92),
 ('Beverages', 47.45, 31.79),
 ('Personal Care', 81.73, 56.67),
 ('Clothes', 109.28, 35.84),
 ('Snacks', 152.58, 97.44),
 ('Vegetables', 154.06, 90.93),
 ('Cereal', 205.7, 117.11),
 ('Baby Food', 255.28, 159.42),
 ('Meat', 421.89, 364.69),
 ('Cosmetics', 437.2, 263.33),
 ('Office Supplies', 651.21, 524.96),
 ('Household', 668.27, 502.54)]

In [0]:
#Display the number of sales yearwise
year_units_rdd = line_rdd.map(lambda x:(x[5][-4:],int(x[8])))
year_wise_sales = year_units_rdd.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[0])
year_wise_sales.collect()

Out[24]: [('2010', 3130137),
 ('2011', 3352394),
 ('2012', 3485045),
 ('2013', 3358584),
 ('2014', 3214899),
 ('2015', 3506548),
 ('2016', 3280818),
 ('2017', 1825066)]

In [0]:
#Display the number of orders for each item
item_orders = line_rdd.map(lambda x:(x[2],1)).reduceByKey(lambda a,b:a+b)
item_orders.collect()

Out[25]: [('Baby Food', 445),
 ('Snacks', 398),
 ('Cereal', 385),
 ('Clothes', 386),
 ('Cosmetics', 424),
 ('Fruits', 447),
 ('Beverages', 447),
 ('Personal Care', 415),
 ('Office Supplies', 420),
 ('Meat', 399),
 ('Vegetables', 410),
 ('Household', 424)]