# Environment setup and others

1. Change the jupyter theme <br>
`!jt -t grade3`

2. Setting up a new environment variable(windows)
    - New System variable
    - Variable name: e.g. SPARK_HOME
    - Variable value: e.g. C:\spark1.6
    - Edit Path -> New -> %SPARK_HOME%\bin

3. Solve hive's metastore issue
    - mysql -u root -p
    - create database metastore;
    - SET GLOBAL time_zone = '+3:00';
    - schematool -initSchema -dbType mysql

4. Running argv script from PyCharm
    - Run
    - Script path
    - the parameters

5. Setting up PyCharm with Spark
    - Settings
    - Project -> Project Structure
    - Add Content Root -> C:\spark\python
    - Add Content Root -> C:\spark\python\lib\py4j

# HDFS

1. Properties files
    - `/hadoop/etc/hadoop/core-site.xml`
    - `/hadoop/etc/hadoop/hdfs-site.xml`

2. Properties
    - `fs.defaultFS`
    - `dfs.blocksize` -> the blocksize into which the files will be divided
    - `dfs.replication` -> the number of copies for the files

3. Web interface
    - `http://localhost:9870`

4. Commands
    - `ls -ltr|grep dbratu` - searching for some specific pattern
    - `export SPARK_MAJOR_VERSION=2` - run spark 2.x
    - `hostname -f` - check the hostname
    - `winutils chmod 777 /tmp/hive` - solve spark-shell error
    - `tar xzvf <file_path>` - eXtract, uncompress, Verbose, the entire File
    - `du -sh` - check the files size
    - `wc -l /Users/dbratu/Documents/big_data/data-master/retail_db/*/*` - check number of records
    - `hadoop fs` - the command line interface
    - `hadoop fs -ls` - list the files
    - `hadoop fs -tail <path>` - show the last lines of a file
    - `hadoop fs -ls -R` - including subdirectories
    - `hadoop fs -du -s -h` - check the file size
    - `hadoop fsck /user/dbratu/data  -files -locations` - locations of the files
    - `hadoop fs -get <source> <destination>`
    - `hadoop fs -copyToLocal <source> <destination>`

5. Yarn (Yet Another Resource Negotiator)
    - Properties files
        - `/hadoop/etc/hadoop/yarn-site.xml`
        - `/spark/conf/spark-env.sh`
    - Web interface
        - `http://localhost:8088/cluster`

6. Compression
    - go to /etc/hadoop/conf
    - view core-site.xml
    - search for "codecs"
    - org.apache.hadoop.io.compress.GzipCodec
    - org.apache.hadoop.io.compress.DefaultCodec
    - org.apache.hadoop.io.compress.SnappyCodec

# Spark

## Information
- spark.apache.org
- spark2 uses dataframes more
- df is rdd with structure
- Start spark-sql session:<br>
`spark-sql --conf spark.ui.port=4040`
- Execution frameworks:
    - hive: mapreduce
    - spark-sql: spark
- In hive context there is no: `hadoop fs`, there is `dfs`

- Metastore<br>
`set hive.metastore.warehouse.dir`

## CLI commands
- `spark-submit`: see parameters
- `pyspark`
    - `--master yarn`
    - `--conf spark.ui.port=4040`: the web interface is at http://localhost:4040/ (default)
    - `--num-executors 2 `
    - `--executor-cores 2`
    - `--executor-memory 512M`

## Port Numbers:
NameNode – Port 50070 <br>
Task Tracker – Port 50060<br>
Job Tracker – Port 50030<br>

## pyspark

### Config
- Force spark to use x threads<br>
`spark.conf.set('spark.sql.shuffle.partitions', 'x')`

### Actions and transformations (RDD)

#### Read a file

In [None]:
orders = sc.textFile("/path/", minPartitions=18)` # the number of splits of file, tasks

#### Create rdd from a list

In [None]:
rdd_list = [1, 2, 3, 4]
rdd = sc.parallelize(rdd_list)
rdd.take(10)

#### Reading different file formats using sqlContext

In [None]:
sqlContext.read.text("/path/")
sqlContext.read.json("/path/")
sqlContext.read.orc("/path/")
sqlContext.read.parquet("/path/")

#### Map

In [None]:
rdd_map = rdd.map(lambda x: x.split(",")[-1])

#### Flatmap

In [None]:
list_of_words = ['Dragos Bratu Florian', 'Bratu Dragos', "Dragos Meltzer"]
list_rdd = sc.parallelize(list_of_words)
words = list_rdd.flatMap(lambda x: x.split(" "))
print(words.collect())` # ['Dragos', 'Bratu', 'Florian', 'Bratu', 'Dragos', 'Dragos', 'Meltzer']

#### Filter

In [None]:
orders = sc.textFile("/Users/dbratu/Documents/big_data/data-master/retail_db/orders")
filtered = orders.filter(lambda x: x.split(",")[3] in ["CLOSED", "COMPLETE"])
print(*orders.take(3), sep="\n")

#### Inner join
(k, v) join (k, w) -> (k, (v, w))

In [None]:
orders = sc.textFile("/Users/dbratu/Documents/big_data/data-master/retail_db/orders")
order_items = sc.textFile("/Users/dbratu/Documents/big_data/data-master/retail_db/order_items")
products = sc.textFile("/Users/dbratu/Documents/big_data/data-master/retail_db/products")

orders_map = orders.map(lambda x: (int(x.split(",")[0]), x.split(",")[1]))
order_items_map = order_items.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))

orders_join = orders_map.join(order_items_map)
print(*orders_join.take(3), sep="\n")
# (2, ('2013-07-25 00:00:00.0', 199.99))
# (2, ('2013-07-25 00:00:00.0', 250.0))
# (2, ('2013-07-25 00:00:00.0', 129.99))

#### Outer join

In [None]:
orders_LO_join = orders_map.leftOuterJoin(order_items_map)
orders_RO_join = orders_map.rightOuterJoin(order_items_map)

#### count()

In [None]:
orders.count()

#### reduce()

In [None]:
result = order_items.filter(lambda oi: int(oi.split(",")[1]) == 2).map(lambda x: float(x.split(",")[4])).reduce(lambda x, y: x+y)

#### countByKey()

In [None]:
order_status = orders.map(lambda x: (x.split(",")[-1], 1))
for item in order_status.countByKey().items():
    print(*item)

#### Combiner (using threads)
- groupByKey() - doesn't use combiner
- reduceByKey() - use combiner : when there is only one function for combiner and reduce
- aggregateByKey() - use combiner : when there are two functions for combiner and reduce
    - (1, 1..1000) - sum(1, 2, 3, 4, 5, ..., 1000)
    - (1, 1..1000) - sum(sum(1, 250), sum(251, 500), sum(501, 750), sum(751, 1000))

#### groupByKey()

In [None]:
res = order_items.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4]))).groupByKey()
oi_gr = oi_map.groupByKey() 
# (1, <pyspark.resultiterable.ResultIterable object at 0x7fd5c25283a0>)

res = res.collect()
for i in range(3):
    print(res[i][0], list(res[i][1]))

#### sorted inside flatmap()

In [None]:
oi_sort = oi_gr.flatMap(lambda oi: sorted(oi[1], key=lambda k: float(k.split(",")[4]), reverse=True))
print(*oi_sort.take(10), sep="\n")

#### reduceByKey()

In [None]:
oi_map = order_items.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))
# (1, 299.98)
oi_rdb = oi_map.reduceByKey(lambda x, y: x + y)
print(*oi_rdb.take(3), sep="\n")`
# (1, 299.98)
# (2, 579.98)
# (4, 699.85)

#### aggregateByKey()

In [None]:
oi_map = order_items.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))
# (1, 299.98)
oi_agg = oi_map.aggregateByKey((0.0, 0),
                               lambda x, y: (x[0] + y, x[1] + 1),
                               lambda x, y: (x[0] + y[0], x[1] + y[1]))

print(*oi_agg.take(10), sep="\n")
# (1, (299.98, 1))
# (2, (579.98, 3))
# (4, (699.85, 4))

#### sortByKey()

In [None]:
prod_map = products.filter(lambda z: z.split(",")[4] != "").map(lambda p: (-float(p.split(",")[4]), p))`
# (59.98, '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')

prod_sort = prod_map.sortByKey()
# (-1999.99, '208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical')
# (-1799.99, '66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill')
    
prod_sort_map = prod_sort.map(lambda x: x[1])
# 208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
# 66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill

#### double sort

In [None]:
prod_map = products.filter(lambda z: z.split(",")[4] != "").map(lambda p: ((int(p.split(",")[1]), -float(p.split(",")[4])), p))`
# ((2, -59.98), '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy'
# ((2, -59.98), '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy'
# ((2, -59.98), '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy'
# ((2, -59.98), '1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')

prod_sort = prod_map.sortByKey(False)
# ((2, -299.99), '16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet')
# ((2, -209.99), '11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set')
# ((2, -199.99), '5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet')

#### Ranking
1. takeOrdered() - by default it sorts ascending
2. top() - by default it sorts descending

In [None]:
prod_map = products.filter(lambda z: z.split(",")[4] != "")
top5 = prod_map.takeOrdered(5, key=lambda x: -float(x.split(",")[4]))
# or
top5 = prod_map.top(5, key=lambda x: float(x.split(",")[4]))
# 208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
# 66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill

### Get top 3 per each category

In [None]:
prod_fmg = products.filter(lambda z: z.split(",")[4] != "").\
    map(lambda p: (int(p.split(",")[1]), p)).\
    groupByKey()
# (2, <pyspark.resultiterable.ResultIterable object at 0x7fad3bae9640>)
# (3, <pyspark.resultiterable.ResultIterable object at 0x7fad3bae94f0>)
# (4, <pyspark.resultiterable.ResultIterable object at 0x7fad3bae94c0>)

top3 = prod_fmg.flatMap(lambda x: sorted(x[1], key=lambda y: float(y.split(",")[4]), reverse=True)[:3])
# 16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet
# 11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 40,3,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy
# 32,3,PUMA Men's evoPOWER 1 Tricks FG Soccer Cleat,,189.99,http://images.acmesports.sports/PUMA+Men%27s+evoPOWER+1+Tricks+FG+Soccer+Cleat
# 35,3,adidas Brazuca 2014 Official Match Ball,,159.99,http://images.acmesports.sports/adidas+Brazuca+2014+Official+Match+Ball

### Take top3 different prices

In [None]:
prod_fmg = products.filter(lambda z: z.split(",")[4] != "").\
    map(lambda p: (int(p.split(",")[1]), p)).groupByKey()
# (2, <pyspark.resultiterable.ResultIterable object at 0x7fad3bae9640>)

def take_top(grouped_products, top_N):
    sorted_prices = sorted(grouped_products[1], key=lambda x: float(x.split(",")[4]), reverse=True)
    list_prices = map(lambda x: float(x.split(",")[4]), sorted_prices)
    top_prices = sorted(set(list_prices), reverse=True)[:top_N]
    from itertools import takewhile
    return takewhile(lambda y: float(y.split(",")[4]) in top_prices, sorted_prices)

result = prod_fmg.flatMap(lambda x: take_top(x, 3))
# 16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet
# 11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 14,2,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy
# 40,3,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy
# 32,3,PUMA Men's evoPOWER 1 Tricks FG Soccer Cleat,,189.99,http://images.acmesports.sports/PUMA+Men%27s+evoPOWER+1+Tricks+FG+Soccer+Cleat
# 35,3,adidas Brazuca 2014 Official Match Ball,,159.99,http://images.acmesports.sports/adidas+Brazuca+2014+Official+Match+Ball
# 48,3,adidas Brazuca Final Rio Official Match Ball,,159.99,http://images.acmesports.sports/adidas+Brazuca+Final+Rio+Official+Match+Ball

### Set operations

In [None]:
orders201312 = orders.filter(lambda o: o.split(",")[1][:7] == "2013-12").\
    map(lambda x: (int(x.split(",")[0]), x))
orders201401 = orders.filter(lambda o: o.split(",")[1][:7] == "2014-01").\
    map(lambda x: (int(x.split(",")[0]), x))
# (25876, '25876,2014-01-01 00:00:00.0,3414,PENDING_PAYMENT')

order_items_m = order_items.map(lambda oi: (int(oi.split(",")[1]), oi))
oi2013 = orders201312.join(order_items_m).map(lambda x: x[1][1])
oi2014 = orders201401.join(order_items_m).map(lambda x: x[1][1])
# 52252,20916,957,1,299.98,299.98
# 52253,20916,365,2,119.98,59.99
# 52254,20916,897,5,124.95,24.99

prod_13 = oi2013.map(lambda x: int(x.split(",")[2]))
prod_14 = oi2014.map(lambda x: int(x.split(",")[2]))

#### union

In [None]:
allprod = prod_13.union(prod_14) # not giving distinct records
print(allprod.count()) # 29395
allprod = prod_13.union(prod_14).distinct() # not giving distinct records
print(allprod.count()) # 100

#### intersection

In [None]:
inters = prod_13.intersection(prod_14) # distinct is implicit
print(*inters.take(10), sep="\n")
print(inters.count())

#### minus

In [None]:
only13 = prod_13.subtract(prod_14) # distinct not implicit
only14 = prod_14.subtract(prod_13)
print(only13.collect()) # [127, 127, 127]
print(only14.collect()) # [58, 58]

### Save to HDFS

#### text file

In [None]:
allprod.coalesce(2).saveAsTextFile("/Users/dbratu/Documents/big_data/all2",
                       compressionCodecClass='org.apache.hadoop.io.compress.SnappyCodec')
# this path doesn't have to exist
# only 2 files as output
# compressed as Snappy

#### other formats
first the rdd has to be converted into a df

In [None]:
rdd.toDF(schema=["col1", "col2"])
df.save("<path>", "json")

df.write.json("<path>")
df.write.json("<path>", "gzip") # with compression
df.write.json("<path>", "snappy")

#### avro

In [None]:
pyspark
    --master yarn
    --conf spark.ui.port=4040 
    --num-executors 2 
    --executor-memory 512M
    --packages com.databricks:spark-avro_2.10:2.0.1
or
    --jars <path_to_jar>

data_df.save("<path>", "com.databricks:spark-avro")

# check data
sqlContext.load("<path>", "com.databricks:spark-avro")

### Actions and transformations (DF)

In [None]:
from pyspark import SparkConf, SparkContext, SQLContext

spark = SparkSession.\
    builder.\
    appName("DailyRevenue").\
    master(props.get(env, "executionMode")).\
    getOrCreate()
# or
sc = SparkContext(master="local", appName="Spark Demo")
spark = SparkSession(sc)

#### Split

In [None]:
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import split, explode
spark = SparkSession.\
    builder.\
    appName("DailyRevenue").\
    master("local").\
    getOrCreate()
data = spark.read.text("/Users/dbratu/Documents/notite.sql")
wc = data.select(explode(split(data.value, " ")).alias("words")).groupBy("words").count()
wc.write.csv("/Users/dbratu/Documents/wc")

#### Read a file

In [None]:
ordersDF = spark.
    read.
    csv("/Users/dbratu/Documents/big_data/data-master/retail_db/orders")

ordersFRM = spark.read.
    format('csv').
    option('sep', ',').
    schema('order_id int, order_date string, order_customer_id int, order_status string').
    load('/Users/dbratu/Documents/big_data/data-master/retail_db/orders')`

#### Show records

In [None]:
orders.take()
ordersDF.select("_c0", "_c1", "_c2", "_c3").show(5)
# +---+--------------------+-----+---------------+
# |_c0|                 _c1|  _c2|            _c3|
# +---+--------------------+-----+---------------+
# |  1|2013-07-25 00:00:...|11599|         CLOSED|
# |  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
# |  3|2013-07-25 00:00:...|12111|       COMPLETE|
# |  4|2013-07-25 00:00:...| 8827|         CLOSED|
# |  5|2013-07-25 00:00:...|11318|       COMPLETE|
# +---+--------------------+-----+---------------+`

ordersDF.select("_c0", "_c1", "_c2", "_c3").show(5, False)

# +---+---------------------+-----+---------------+
# |_c0|_c1                  |_c2  |_c3            |
# +---+---------------------+-----+---------------+
# |1  |2013-07-25 00:00:00.0|11599|CLOSED         |
# |2  |2013-07-25 00:00:00.0|256  |PENDING_PAYMENT|
# |3  |2013-07-25 00:00:00.0|12111|COMPLETE       |
# |4  |2013-07-25 00:00:00.0|8827 |CLOSED         |
# |5  |2013-07-25 00:00:00.0|11318|COMPLETE       |
# +---+---------------------+-----+---------------+

#### Describe

In [None]:
ordersDF.describe().show()
# +-------+------------------+--------------------+-----------------+---------------+
# |summary|               _c0|                 _c1|              _c2|            _c3|
# +-------+------------------+--------------------+-----------------+---------------+
# |  count|             68883|               68883|            68883|          68883|
# |   mean|           34442.0|                null|6216.571098819738|           null|
# | stddev|19884.953633337947|                null|3586.205241263963|           null|
# |    min|                 1|2013-07-25 00:00:...|                1|       CANCELED|
# |    max|              9999|2014-07-24 00:00:...|             9999|SUSPECTED_FRAUD|
# +-------+------------------+--------------------+-----------------+---------------+

#### Create table

In [None]:
ordersDF.createTempView('orders')
spark.sql("select * from orders limit 3").show()

# +---+--------------------+-----+---------------+
# |_c0|                 _c1|  _c2|            _c3|
# +---+--------------------+-----+---------------+
# |  1|2013-07-25 00:00:...|11599|         CLOSED|
# |  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
# |  3|2013-07-25 00:00:...|12111|       COMPLETE|
# +---+--------------------+-----+---------------+

#### Read hive table

In [None]:
ordersTB = spark.read.table("dbratu.orders")
spark.sql("select * from dbratu.orders limit 10").show()

#### Substring

In [None]:
from pyspark.sql.functions import substring

ordersDF.select(substring(ordersDF.order_date, 0, 7).alias("Dra")).show(5)
# +-------+
# |    Dra|
# +-------+
# |2013-07|
# |2013-07|
# |2013-07|
# |2013-07|
# |2013-07|
# +-------+

#### Add a new column worked

In [None]:
ordersDF.withColumn("new_col", substring(ordersDF.order_date, 1, 7)).show(2)
# +--------+--------------------+-----------------+---------------+-------+
# |order_id|          order_date|order_customer_id|   order_status|new_col|
# +--------+--------------------+-----------------+---------------+-------+
# |       1|2013-07-25 00:00:...|            11599|         CLOSED|2013-07|
# |       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|2013-07|
# +--------+--------------------+-----------------+---------------+-------+

#### SelectExpr

In [None]:
ordersDF.select(substring(ordersDF.order_date, 0, 7).alias("Dra")).show(3)
ordersDF.selectExpr("substring(order_date, 1, 7) as order_month").show(3)
# +-----------+
# |order_month|
# +-----------+
# |    2013-07|
# |    2013-07|
# |    2013-07|
# +-----------+

#### Filtering

In [None]:
ordersDF.filter(ordersDF.order_status == "COMPLETE").show(5)
# or
ordersDF.filter("order_status = 'COMPLETE'").show(5)
# +--------+--------------------+-----------------+------------+
# |order_id|          order_date|order_customer_id|order_status|
# +--------+--------------------+-----------------+------------+
# |       3|2013-07-25 00:00:...|            12111|    COMPLETE|
# |       5|2013-07-25 00:00:...|            11318|    COMPLETE|
# |       6|2013-07-25 00:00:...|             7130|    COMPLETE|
# |       7|2013-07-25 00:00:...|             4530|    COMPLETE|
# |      15|2013-07-25 00:00:...|             2568|    COMPLETE|
# +--------+--------------------+-----------------+------------+


# or |
ordersDF.filter((ordersDF.order_status == "COMPLETE") | (ordersDF.order_status == "CLOSED")).show(5)
# isin()
ordersDF.filter(ordersDF.order_status.isin("COMPLETE", "CLOSED")).show(5)

# slq in
ordersDF.filter("order_status in ('COMPLETE', 'CLOSED')").show(5)

# like in
ordersDF.filter((ordersDF.order_status.isin("COMPLETE", "CLOSED")) &
                (ordersDF.order_date.like("2013-08%"))).show(5)

# like in sql
ordersDF.filter("order_status in ('COMPLETE', 'CLOSED') and order_date like '2013-08%'").show(5)

# date_format
ordersDF.filter(date_format(ordersDF.order_date, 'dd') == '01').select('order_date').distinct().show(100, False)
ordersDF.filter("date_format(order_date, 'dd') = '01'").select('order_date').distinct().show(100, False)

# +---------------------+
# |order_date           |
# +---------------------+
# |2014-07-01 00:00:00.0|
# |2013-08-01 00:00:00.0|
# |2014-03-01 00:00:00.0|
# |2014-01-01 00:00:00.0|
# |2014-05-01 00:00:00.0|
# |2013-11-01 00:00:00.0|
# |2014-02-01 00:00:00.0|
# |2013-10-01 00:00:00.0|
# |2013-12-01 00:00:00.0|
# |2014-04-01 00:00:00.0|
# |2013-09-01 00:00:00.0|
# |2014-06-01 00:00:00.0|
# +---------------------+

#### joining datasets

In [None]:
ordersDF_f = ordersDF.where(ordersDF.order_status.isin("CLOSED", "COMPLETE"))
ordersDF_j = ordersDF_f.join(order_itemsDF, ordersDF_f.order_id == order_itemsDF.order_item_order_id, "inner")

# data in the first table but not in the second one
ordersDF.join(order_itemsDF, ordersDF.order_id == order_itemsDF.order_item_order_id, 'left').where("order_item_order_id is null").count()

#### countDistinct

In [None]:
from pyspark.sql.functions import countDistinct

ordersDF.select(countDistinct(ordersDF.order_status).alias('Counter')).show()
# +-------+
# |Counter|
# +-------+
# |      9|
# +-------+

#### groupBy

In [None]:
# no alias prossible
order_itemsDF.groupBy("order_item_order_id").sum("order_item_subtotal").show()

# alias possible: agg
order_itemsDF.groupBy("order_item_order_id").agg(sum("order_item_subtotal").alias("Dragos")).show()

#### count

In [None]:
ordersDF.groupBy("order_status").agg(count("order_status").alias("Counter")).show()
# +---------------+-------+
# |   order_status|Counter|
# +---------------+-------+
# |PENDING_PAYMENT|  15030|
# |       COMPLETE|  22899|
# |        ON_HOLD|   3798|
# | PAYMENT_REVIEW|    729|
# |     PROCESSING|   8275|
# |         CLOSED|   7556|
# |SUSPECTED_FRAUD|   1558|
# |        PENDING|   7610|
# |       CANCELED|   1428|
# +---------------+-------+

In [None]:
### join/groupBy/sum/sort

In [None]:
ordersDF.join(order_itemsDF, ordersDF.order_id == order_itemsDF.order_item_order_id).
        groupBy("order_date", "order_item_product_id").
        agg(sum("order_item_subtotal").alias("Revenue")).
        show()
ordersDF.sort(["order_date", "order_customer_id"], ascending=[1, 0]).show()

### Spark app

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import configparser as cp, sys

props = cp.RawConfigParser()
props.read("/Users/dbratu/Documents/big_data/application.properties")
env = sys.argv[1]
inputBaseDir = props.get(env, "input.base.dir")
outputBaseDir = props.get(env, "output.base.dir")

sc = SparkContext(master="local", appName="Daily_Revenue")
# spark = SparkSession(sc)
spark = SparkSession.\
    builder.\
    appName("DailyRevenue").\
    master(props.get(env, "executionMode")).\
    getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", "2")
spark.sparkContext.setLogLevel("ERROR")

orders = spark.\
    read.\
    format("csv").\
    schema("""order_id int,
     order_date string,
      order_customer_id int,
       order_status string""").\
    load(inputBaseDir +"/orders")

order_items = spark.\
    read.\
    format("csv").\
    schema("""order_item_id int,
     order_item_order_id int,
      order_item_product_id int,
       order_item_quantity int,
        order_item_subtotal float,
         order_item_product_price float""").\
    load(inputBaseDir +"/order_items")

daily_prod_rev = orders.\
    filter("order_status in ('CLOSED', 'COMPLETE')").\
    join(order_items, orders.order_id == order_items.order_item_order_id).\
    groupBy("order_date", "order_item_order_id").\
    agg(round(sum("order_item_subtotal"), 2).alias("revenue"))

daily_prod_sort = daily_prod_rev.sort([daily_prod_rev.order_date, daily_prod_rev.revenue], ascending=[1, 0])

daily_prod_sort.write.csv(outputBaseDir + "/rev1")

### application.properties

In [None]:
[dev]
executionMode = local
input.base.dir = /Users/dbratu/Documents/big_data/data-master/retail_db
output.base.dir = /Users/dbratu/Documents/big_data/app_out

[prod]
executionMode = yarn-client
input.base.dir = /Users/dbratu/Documents/big_data/data-master/retail_db
output.base.dir = /Users/dbratu/Documents/big_data/app_out

### Analytics functions

#### sum

In [None]:
spec = Window.partitionBy(order_items.order_item_order_id)

print(order_items.withColumn("revenue", 
                             round(sum("order_item_subtotal").
                                              over(spec), 2)).
      select("order_item_id",
             "order_item_order_id",
             "order_item_subtotal",
             "revenue").show())

#### Count

In [None]:
spec = Window.partitionBy('order_date')

print(orders.select("order_id", 
                    "order_date", 
                    "order_customer_id", 
                    "order_status", 
                    count('order_date').over(spec).alias("daily_count")).show())

#### min/max/avg/pct

In [None]:
employees = spark.\
    read.\
    format("csv").\
    option('sep', '\t').\
    schema("""employee_id int,
     f_name string,
      l_name string,
       email string,
        phone_number string,
         hire_date string,
          department string,
           salary float,
            pct string,
             department_id int,
              manager_id int""").\
    load("/Users/dbratu/Documents/big_data/data-master/hr_db/employees")

spec = Window.partitionBy("department_id")

print(employees.
      select("employee_id", "department_id", "salary").
      withColumn("salary_expense", sum("salary").over(spec)).
      withColumn("least_salary", min("salary").over(spec)).
      withColumn("maximum_salary", max("salary").over(spec)).
      withColumn("average_salary", avg("salary").over(spec)).
      withColumn("salary_pct", round(employees.salary/sum("salary").over(spec) * 100, 2)).
      sort('department_id').show())

#### lead/lag

In [None]:
spec = Window.partitionBy("department_id").orderBy(employees.salary.desc())

print(employees.
      select("employee_id", "department_id", "salary").
      withColumn("next_employee", lead("employee_id").over(spec)).
      withColumn("difference", employees.salary - lead("salary").over(spec)).
      sort('department_id', employees.salary.desc()).show())

##### Make lag work

In [None]:
spec = Window.partitionBy("department_id").\
    orderBy(employees.salary.desc()).\
    rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

print(employees.
      select("employee_id", "department_id", "salary").
      withColumn("last", last("salary", False).over(spec)).
      sort('department_id', employees.salary.desc()).show())

#### ranking

In [None]:
spec = Window.partitionBy("department_id").\
    orderBy(employees.salary.desc())

print(employees.
      select("employee_id", "department_id", "salary").
      withColumn("rank", rank().over(spec)).
      withColumn("dense_rank", dense_rank().over(spec)).
          withColumn("row_number", row_number().over(spec)).
      sort('department_id', employees.salary.desc()).show(50))

## Run as application

In [None]:
spark-submit 
    --master local
    --conf spark.ui.port=4040 
    --num-executors 2 
    --executor-memory 512M
    \Users\dbratu\Desktop\demo.py

# HIVE

## Querries/Functions

### Create a database
`create database database_name;`

### Use a database
`use database_name;`

### Show tables/databases
`show tables;`<br>
`show databases;`

### Create a table
`create table orders (
order_id int,
order_date string,
order_customer_id int,
order_status string
)row format delimited fields terminated by "," # the delimiter
stored as textfile; # these are given in hive language manual during the certification`

### Load data into tables:
- `load data local inpath "/Users/dbratu/Documents/big_data/data-master/retail_db/orders" into table orders;` - local files
- `load data inpath "/Users/dbratu/Documents/big_data/hdfs_files/data-master/retail_db/orders" into table orders;` - hdfs files

### Preview data
`select * from orders limit 10;`<br>
`dfs -ls /Users/dbratu/Documents/big_data/hive_files/dbratu.db/orders;`

### Store table as orc format
`create table orders(
order_id int,
order_date string,
order_customer_id int,
order_status string
) stored as orc;`

### Insert into orc table
`insert into table orders select * from dbratu.orders;`

### Information about table
`describe orders;
describe formatted orders;`

### Running hive commands using pyspark
`sqlContext.sql("show tables").show()`

### String manipulation
- substr <br>
`select substr("Dragos", 2, 3);`-> `rag`
- instr <br>
`select instr("Dragos", "r");` -> `2`
- like <br>
`select "la la la tra" like "%tra";` -> `true`
- rlike (regular expressions) <br>
- lcase=lower/ucase=upper <br>
- initcap <br>
- trim <br>
`select trim(" Dragos Bratu ");` -> `Dragos Bratu`
- padding <br>
`select lpad("Dra", 4, "#");`-> `#Dra`
- cast <br> 
`select cast("12" as int);` -> `12`
- split/index <br>
`select index(split("Dragos Bratu Florian", " "), 1);` -> `"Bratu"`

### Date manipulation
- current date/timestamp <br>
`select current_date;`
- date_format <br>
`select date_format(current_date, "MM-dd/YYYY");` -> `08-17/2020`
    - "d" - day in the week
    - "D" - day in the year
- day <br>
`select day(current_date);` -> `17`
- to_date <br>
`select to_date(current_timestamp);` -> `2020-08-17`
- to_unix_timestamp <br>
`select to_unix_timestamp(current_date);` -> `1597622400`
- from_unixtime <br>
`select from_unixtime(1597622400);` -> `2020-08-17 00:00:00`

`select to_unix_timestamp(to_date(order_date)) from orders limit 3;`
    - 1374710400
    - 1374710400
    - 1374710400

### Aggregate functions
- min
- max
- count
- avg

### Case vs nvl

`select order_status,
            case order_status
            when "CLOSED" then "No Action"
            when "COMPLETE" then "No Action"
            end from orders limit 10;`

`select order_status,
            case
                when order_status in ("CLOSED", "COMPLETE") then "No Action"
                when order_status in ("CANCELED","ON_HOLD", "PAYMENT_REVIEW", "PENDING", "PENDING_PAYMENT","PROCESSING","SUSPECTED_FRAUD") then "Action"
                else "Risky"
                end from orders limit 10;`
- CLOSED	No Action
- PENDING_PAYMENT	Action
- COMPLETE	No Action
- CLOSED	No Action
- COMPLETE	No Action
- COMPLETE	No Action
- COMPLETE	No Action
- PROCESSING	Action
- PENDING_PAYMENT	Action
- PENDING_PAYMENT	Action

`select case when order_status is null then "Data missing" else order_status end from orders limit 10;`

or

`select nvl(order_status, "Data missing") from orders limit 10;`

### Row level transformation
`select cast(concat(substr(order_date, 1, 4), substr(order_date, 6, 2)) as int) from orders limit 10;`

or

`select cast(date_format(order_date, "YYYYMM") as int) from orders limit 10;`

### Joins

#### Inner join
- wrong way <br>
`select o.*, c.* from orders o, customers c
where o.order_customer_id = c.customer_id
limit 10;`
- right way <br>
`select o.*, c.* from orders o join customers c
on o.order_customer_id = c.customer_id
limit 10;`

#### Left/Right outer join
`select o.*, c.* from customers c left outer join orders o
on o.order_customer_id = c.customer_id
limit 10;`

### Group by and aggregations

only the column present in group by can be present besides the aggregate function

`select order_status, count(1) from orders
group by order_status;`

`CANCELED	2856
CLOSED	15112
COMPLETE	45798`

`select o.order_id, sum(oi.order_item_subtotal) order_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE", "CLOSED")
group by o.order_id
having sum(oi.order_item_subtotal) >= 10000;`

`68703	13799.639953613281
68724	11439.560012817383
68778	10519.599975585938
68806	10519.680053710938
68821	10519.680053710938
68858	11359.640014648438`

### Sorting

You can also use alis in order by clause (only here)

`select o.order_id, round(sum(oi.order_item_subtotal), 2) order_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE", "CLOSED")
group by o.order_id
having sum(oi.order_item_subtotal) >= 10000
order by sum(oi.order_item_subtotal);`

`68703	13799.64
68724	11439.56
68858	11359.64
68821	10519.68
68806	10519.68
68778	10519.6`

`select o.order_id, o.order_date, o.order_status, round(sum(oi.order_item_subtotal), 2) order_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE", "CLOSED")
group by o.order_id, o.order_date, o.order_status
having sum(oi.order_item_subtotal) >= 3000
distribute by o.order_date sort by o.order_date, order_revenue desc;`

## Set operations

__Union all__

`select 1, "Dragos"
union all
select 2, "Bratu"
union all
select 1, "Dragos"
union all
select 2, "Florian";`
- 1	Dragos
- 2	Bratu
- 1	Dragos
- 2	Florian

__Union (remove duplicates)__

`select 1, "Dragos"
union
select 2, "Bratu"
union
select 1, "Dragos"
union
select 2, "Florian";`

- 1	Dragos
- 2	Bratu
- 2	Florian

## Windowing and Analytics functions

### Aggregations
- SUM, AVG

`select * from (
select o.order_id, o.order_date, o.order_status, oi.order_item_subtotal,
round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) order_revenue,
order_item_subtotal/round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) pct_revenue,
round(avg(oi.order_item_subtotal) over (partition by o.order_id), 2) avg_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE", "CLOSED")) q
where order_revenue >= 3000
order by order_date, order_revenue desc;`

### Ranking

`select * from (
select o.order_id, o.order_date, o.order_status, oi.order_item_subtotal,
round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) order_revenue,
order_item_subtotal/round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) pct_revenue,
round(avg(oi.order_item_subtotal) over (partition by o.order_id), 2) avg_revenue,
rank() over (partition by o.order_id order by oi.order_item_subtotal desc) rnk_revenue,
dense_rank() over (partition by o.order_id order by oi.order_item_subtotal desc) dense_rnk_revenue,
percent_rank() over (partition by o.order_id order by oi.order_item_subtotal desc) pct_rnk_revenue,
row_number() over (partition by o.order_id order by oi.order_item_subtotal desc) rn_ordered_revenue,
row_number() over (partition by o.order_id) rn_revenue
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE", "CLOSED")) q
where order_revenue >= 1000
order by order_date, order_revenue desc, rnk_revenue;`

### Windowing functions
LEAD, LAG FIRST_VALUE, LAST_VALUE

`select * from (
select o.order_id, o.order_date, oi.order_item_subtotal,
round(sum(oi.order_item_subtotal) over (partition by o.order_id), 2) order_revenue,
lead(oi.order_item_subtotal) over (partition by o.order_id order by oi.order_item_subtotal desc) lead_order_item_subtotal,
lag(oi.order_item_subtotal) over (partition by o.order_id order by oi.order_item_subtotal desc) lag_order_item_subtotal,
first_value(oi.order_item_subtotal) over (partition by o.order_id order by oi.order_item_subtotal desc) fv_order_item_subtotal,
last_value(oi.order_item_subtotal) over (partition by o.order_id order by oi.order_item_subtotal desc) lv_order_item_subtotal
from orders o join order_items oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE", "CLOSED")) q
where order_revenue >= 1000
order by order_date, order_revenue desc;`

## Temp tables

- For pyspark shell

`ordersRDD = sc.textFile("/Users/dbratu/Documents/big_data/data-master/retail_db/orders")`

`ordersDF = ordersRDD.map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1], order_customer_id=int(o.split(",")[2]), order_status=o.split(",")[3])).toDF()
ordersDF.show()`


- For pycharm

`from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Row, SparkSession`

`sc = SparkContext(master="local", appName="Spark Demo")
spark = SparkSession(sc)`

`ordersRDD = sc.textFile("/Users/dbratu/Documents/big_data/data-master/retail_db/orders")
ordersDF = ordersRDD.map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1], order_customer_id=int(o.split(",")[2]), order_status=o.split(",")[3])).toDF()
print(ordersDF.show())`


- Create a temporary table

`ordersDF.registerTempTable("ordersDF_table")
sqlContext.sql("select * from ordersDF_table limit 10;").show()`

`+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
+--------+--------------------+-----------------+---------------+`

`sqlContext.sql("select order_status, count(1) as count from ordersDF_table group by order_status;").show()`

`+---------------+-----+
|   order_status|count|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+`


## Table starting from local file sistem data

`from pyspark.sql import Row, SparkSession
productsRAW = open("/Users/dbratu/Documents/big_data/data-master/retail_db/products/part-00000").read().splitlines()
productsRDD = sc.parallelize(productsRAW)
productsDF = productsRDD.map(lambda o: Row(product_id=int(o.split(",")[0]), product_name=o.split(",")[2])).toDF()
productsDF.registerTempTable("products")
sqlContext.sql("show tables;").show()
`

`+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
|  dbratu|  customers|      false|
|  dbratu|order_items|      false|
|  dbratu|     orders|      false|
|        |   products|       true|
+--------+-----------+-----------+`

Join operation

`sqlContext.sql("\
SELECT o.order_date, p.product_name, sum(oi.order_item_subtotal) daily_revenue \
FROM orders o \
JOIN order_items oi \
ON o.order_id = oi.order_item_order_id \
JOIN products p \
ON p.product_id = oi.order_item_order_id \
WHERE o.order_status IN ('CLOSED', 'COMPLETE') \
GROUP BY o.order_date, p.product_name \
ORDER BY o.order_date, daily_revenue DESC;").show()
`

## Write dataframes to hive tables

`sqlContext.sql("\
CREATE TABLE daily_revenue \
(order_date string, \
product_name string, \
daily_revenue float\
) row format delimited fields terminated by ',' \
stored as textfile;")`

`daily_revenueDF = sqlContext.sql("\
SELECT o.order_date, p.product_name, sum(oi.order_item_subtotal) daily_revenue \
FROM orders o \
JOIN order_items oi \
ON o.order_id = oi.order_item_order_id \
JOIN products p \
ON p.product_id = oi.order_item_order_id \
WHERE o.order_status IN ('CLOSED', 'COMPLETE') \
GROUP BY o.order_date, p.product_name \
ORDER BY o.order_date, daily_revenue DESC;")`

`daily_revenueDF.write.insertInto("dbratu.daily_revenue")`

# Problems
1.  1
    b. 2 
2. 10
3. 12
4. 13
5. 15
6. 19
7. 23
    b. 24 x
    b. 27 x
8. 28
9. 32
    b. 34 x
10. 41
11. 46
12. 48
13. 50
14. 52    x
15. 53
    b. 57 x
16. 58
17. 60
18. 63
19. 65
20. 79
21. 80
22. 81
23. 83
24. 89
    b. 91 x
25. 95

In [None]:
Problem Scenario 1
You have been given MySQL DB with following details.
user=retail_dba
password=cloudera
database=retail_db
table=retail_db.categories
jdbc URL = jdbc:mysql://quickstart:3306/retail_db

Please accomplish following activities.
1. Connect MySQL DB and check the content of the tables.
2. Copy "retaildb.categories" table to hdfs, without specifying directory name.
3. Copy "retaildb.categories" table to hdfs, in a directory name "categories_target".
4. Copy "retaildb.categories" table to hdfs, in a warehouse directory name
"categories_warehouse".
##############################################################################################
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext(appName="app", master="local")
spark = SparkSession.\
    builder.\
    appName("app").\
    master('local').\
    getOrCreate()

departments = spark.\
    read.\
    format("jdbc").\
    options(url="jdbc:mysql://quickstart:3306/retail_db",
            driver = "com.mysql.jdbc.Driver",
            dbtable = "categories",
            user="retail_dba",
            password="cloudera").\
    load()

In [None]:
Problem Scenario 2
There is a parent organization called "ABC Group Inc", which has two child companies named 
Tech Inc and MPTech.
Both companies employee information is given in two separate text file as below. 
Please do the following activity for employee details.

Tech Inc.txt -
1,Alok,Hyderabad
2,Krish,Hongkong
3,Jyoti,Mumbai
4,Atul,Banglore
5,Ishan,Gurgaon

MPTech.txt -
6,John,Newyork
7,alp2004,California
8,tellme,Mumbai
9,Gagan21,Pune
10,Mukesh,Chennai

1. Which command will you use to check all the available command line options on HDFS and 
How will you get the Help for individual command.
2. Create a new Empty Directory named Employee using Command line. And also create an empty 
file named in it Techinc.txt
3. Load both companies Employee data in Employee directory 
(How to override existing file in HDFS).
4. Merge both the Employees data in a Single tile called MergedEmployee.txt, 
merged tiles should have new line character at the end of each file content.
5. Upload merged file on HDFS and change the file permission on HDFS merged file, 
so that owner and group member can read and write, other user can read the file.
6. Write a command to export the individual file as well as entire directory from HDFS to 
local file System.
##############################################################################################\
1. hdfs dfs
2. hdfs dfs -mkdir Employee

In [None]:
Problem Scenario 10
You have been given following mysql database details as well as other info. 
user=retail_dba 
password=cloudera 
database=retail_db 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db

Please accomplish following.
1. Create a database named hadoopexam and then create a table named departments in it, 
with following fields. department_id int, department_name string e.g. location should be 
hdfs://quickstart.cloudera:8020/user/hive/warehouse/hadoopexam.db/departments
2. Please import data in existing table created above from retaidb.departments into hive 
table hadoopexam.departments.
3. Please import data in a non-existing table, means while importing create hive table 
named hadoopexam.departments_new
##############################################################################################
1

In [None]:
Problem Scenario 12
You have been given following mysql database details as well as other info. 
user=retail_dba 
password=cloudera 
database=retail_db 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db

Please accomplish following.
1. Create a table in retailedb with following definition.
CREATE table departments_new (department_id int(11), department_name varchar(45), 
                              created_date T1MESTAMP DEFAULT NOW());
2. Now isert records from departments table to departments_new
3. Now import data from departments_new table to hdfs.
4. Insert following 5 records in departmentsnew table. 
Insert into departments_new values(110, "Civil" , null); 
Insert into departments_new values(111, "Mechanical" , null);
Insert into departments_new values(112, "Automobile" , null); 
Insert into departments_new values(113, "Pharma" , null);
Insert into departments_new values(114, "Social Engineering" , null);
5. Now do the incremental import based on created_date column.
##############################################################################################
1

In [None]:
Problem Scenario 13
You have been given following mysql database details as well as other info. 
user=retail_dba 
password=cloudera 
database=retail_db 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db

Please accomplish following.
1. Create a table in retailedb with following definition.
CREATE table departments_export (department_id int(11), department_name varchar(45), 
                                 created_date T1MESTAMP DEFAULT NOWQ);
2. Now import the data from following directory into departments_export table,
/user/cloudera/departments new
##############################################################################################
1

In [None]:
Problem Scenario 15
You have been given following mysql database details as well as other info. 
user=retail_dba 
password=cloudera 
database=retail_db 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db

Please accomplish following activities.
1. In mysql departments table please insert following record. 
Insert into departments values(9999, '"Data Science"1);
2. Now there is a downstream system which will process dumps of this file. 
However, system is designed the way that it can process only files if fields
are enlcosed in(') single quote and separate of the field should be (-} and line needs 
to be terminated by : (colon).
3. If data itself contains the " (double quote } than it should be escaped by \.
4. Please import the departments table in a directory called departments_enclosedby and 
file should be able to process by downstream system.
##############################################################################################
1        

In [None]:
Problem Scenario 19
You have been given following mysql database details as well as other info. 
user=retail_dba 
password=cloudera 
database=retail_db 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db
            
Now accomplish following activities.
1. Import departments table from mysql to hdfs as textfile in departments_text directory.
2. Import departments table from mysql to hdfs as sequncefile in departments_sequence directory.
3. Import departments table from mysql to hdfs as avro file in departments avro directory.
4. Import departments table from mysql to hdfs as parquet file in departments_parquet directory.

In [None]:
Problem Scenario 23
You have been given log generating service as below.
Start_logs (It will generate continuous logs)
Tail_logs (You can check , what logs are being generated)
Stop_logs (It will stop the log service)
Path where logs are generated using above service : /opt/gen_logs/logs/access.log
Now write a flume configuration file named flume3.conf , using that configuration file 
dumps logs in HDFS file system in a directory called flumeflume3/%Y/%m/%d/%H/%M
Means every minute new directory should be created). Please us the interceptors to 
provide timestamp information, if message header does not have header info.
And also note that you have to preserve existing timestamp, if message contains it. 
Flume channel should have following property as well. After every 100 message it should 
be committed, use non-durable/faster channel and it should be able to hold maximum 1000 
events.
##############################################################################################
FLUME

In [None]:
Problem Scenario 24
You have been given below comma separated employee information.
Data Set:
name,salary,sex,age
alok,100000,male,29
jatin,105000,male,32
yogesh,134000,male,39
ragini,112000,female,35
jyotsana,129000,female,39
valmiki,123000,male,29

Use the netcat service on port 44444, and nc above data line by line. 
Please do the following activities.
1. Create a flume conf file using fastest channel, which write data in hive warehouse 
directory, in a table called flumemaleemployee (Create hive table as well tor given data).
2. While importing, make sure only male employee data is stored.
##############################################################################################
# Step 1 : Create hive table for flumeemployee
CREATE TABLE flumemaleemployee
(
name string,
salary int,
sex string,
age int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
# Step 2 : Create flume configuration file

In [None]:
Problem Scenario 27
You need to implement near real time solutions for collecting information when submitted 
in file with below information.

Data -
echo "IBM,100,20160104" >> /tmp/spooldir/bb/.bb.txt
echo "IBM,103,20160105" >> /tmp/spooldir/bb/.bb.txt
mv /tmp/spooldir/bb/.bb.txt /tmp/spooldir/bb/bb.txt

After few mins -
echo "IBM,100.2,20160104" >> /tmp/spooldir/dr/.dr.txt
echo "IBM,103.1,20160105" >> /tmp/spooldir/dr/.dr.txt
mv /tmp/spooldir/dr/.dr.txt /tmp/spooldir/dr/dr.txt

You have been given below directory location (if not available than create it) 
/tmp/spooldir .
You have a finacial subscription for getting stock prices from BloomBerg as well 
as Reuters and using ftp you download every hour new files from their respective ftp site 
in directories /tmp/spooldir/bb and /tmp/spooldir/dr respectively.
As soon as file committed in this directory that needs to be available in hdfs in
/tmp/flume/finance location in a single directory.
Write a flume configuration file named flume7.conf and use it to load data in hdfs with 
following additional properties:
    1. Spool /tmp/spooldir/bb and /tmp/spooldir/dr
    2. File prefix in hdfs sholuld be events
    3. File suffix should be .log
    4. If file is not commited and in use than it should have _ as prefix.
    5. Data should be written as text to hdfs
##############################################################################################
# FLUME APP

In [None]:
Problem Scenario 28
You need to implement near real time solutions for collecting information when 
submitted in file with below

Data -
echo "IBM,100,20160104" >> /tmp/spooldir2/.bb.txt
echo "IBM,103,20160105" >> /tmp/spooldir2/.bb.txt
mv /tmp/spooldir2/.bb.txt /tmp/spooldir2/bb.txt

After few mins -
echo "IBM,100.2,20160104" >> /tmp/spooldir2/.dr.txt
echo "IBM,103.1,20160105" >> /tmp/spooldir2/.dr.txt
mv /tmp/spooldir2/.dr.txt /tmp/spooldir2/dr.txt

You have been given below directory location (if not available than create it) 
/tmp/spooldir2
As soon as file committed in this directory that needs to be available in hdfs in
/tmp/flume/primary as well as /tmp/flume/secondary location.
However, note that/tmp/flume/secondary is optional, if transaction failed which writes 
in this directory need not to be rollback.
Write a flume configuration file named flumeS.conf and use it to load data in hdfs with 
following additional properties .
1. Spool /tmp/spooldir2 directory
2. File prefix in hdfs sholuld be events
3. File suffix should be .log
4. If file is not committed and in use than it should have _ as prefix.
5. Data should be written as text to hdfs
##############################################################################################
FLUME

In [None]:
Problem Scenario 32
You have given three files as below.
spark3/sparkdir1/file1.txt
spark3/sparkd ir2ffile2.txt
spark3/sparkd ir3Zfile3.txt
Each file contain some text.

spark3/sparkdir1/file1.txt
Apache Hadoop is an open-source software framework written in Java for distributed storage 
and distributed processing of very large data sets on computer clusters built from commodity 
hardware. All the modules in Hadoop are designed with a fundamental assumption that hardware 
failures are common and should be automatically handled by the framework 

spark3/sparkdir2/file2.txt
The core of Apache Hadoop consists of a storage part known as Hadoop Distributed File
System (HDFS) and a processing part called MapReduce. Hadoop splits files into large blocks 
and distributes them across nodes in a cluster. To process data, Hadoop transfers packaged 
code for nodes to process in parallel based on the data that needs to be processed. 

spark3/sparkdir3/file3.txt 
his approach takes advantage of data locality nodes manipulating the data they have access 
to to allow the dataset to be processed faster and more efficiently than it would be in a 
more conventional supercomputer architecture that relies on a parallel file system where 
computation and data are distributed via high-speed networking


Now write a Spark code in scala which will load all these three files from hdfs and do 
the word count by filtering following words. And result should be sorted by word count 
in reverse order.
Filter words ("a","the","an", "as", "a","with","this","these","is","are","in", "for",
"to","and","The","of")
Also please make sure you load all three files as a Single RDD 
(All three files must be loaded using single API call).
You have also been given following codec
import org.apache.hadoop.io.compress.GzipCodec
Please use above codec to compress file, while saving in hdfs.

In [None]:
Problem Scenario 34
You have given a file:

user.csv
    id,topic,hits
    Rahul,scala,120 -
    Nikita,spark,80 -
    Mithun,spark,1 -
    myself,cca175,180

Now write a Spark code in scala which will remove the header part and create RDD of values 
as below, for all rows. And also if id is myself" than filter out row.
Map(id -> om, topic -> scala, hits -> 120)
##############################################################################################
a = spark.read.csv(path="/Users/dbratu/Documents/big_data/1.csv")
header = a.first()
b = a.filter((a._c0 != header._c0) & (a._c0 != 'myself'))
b.write.csv("/Users/dbratu/Documents/big_data/2.csv")

In [None]:
Problem Scenario 41
You have been given below code snippet. 
val aul = sc.parallelize(List (("a" , Array(1,2)), ("b" , Array(1,2)))) 
val au2 = sc.parallelize(List (("a" , Array(3)), ("b" , Array(2))))

Apply the Spark method, which will generate below output.
Array[(String, Array[lnt])] = Array((a,Array(1, 2)), 
                                    (b,Array(1, 2)), 
                                    (a(Array(3)), (b,Array(2)))

In [None]:
Problem Scenario 46
You have been given belwo list in scala (name,sex,cost) for each work done.
List( ("Deeapak" , "male", 4000), ("Deepak" , "male", 2000), ("Deepika" , "female",
2000),("Deepak" , "female", 2000), ("Deepak" , "male", 1000) , ("Neeta" , "female", 2000))

Now write a Spark program to load this list as an RDD and do the sum of cost for combination
of name and sex (as key)

In [None]:
Problem Scenario 48
You have been given below Python code snippet, with intermediate output.
We want to take a list of records about people and then we want to sum up their ages 
and count them.
So for this example the type in the RDD will be a Dictionary in the format of 
{name: NAME, age:AGE, gender:GENDER}.
The result type will be a tuple that looks like so (Sum of Ages, Count) 
people = [] people.append({'name':'Amit', 'age':45,'gender':'M'}) 
people.append({'name':'Ganga', 'age':43,'gender':'F'}) 
people.append({'name':'John', 'age':28,'gender':'M'}) 
people.append({'name':'Lolita', 'age':33,'gender':'F'}) 
people.append({'name':'Dont Know', 'age':18,'gender':'T'}) 
peopleRdd=sc.parallelize(people) //Create an RDD 
peopleRdd.aggregate((0,0), seqOp, combOp) //Output of above line : 167, 5)
Now define two operation seqOp and combOp , such that
seqOp : Sum the age of all people as well count them, in each partition. combOp :
Combine results from all partitions.

In [None]:
Problem Scenario 50
You have been given below code snippet (calculating an average score}, 
with intermediate output. 
type ScoreCollector = (Int, Double) 
type PersonScores = (String, (Int, Double)) 
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0),
("Wilma", 95.0), ("Wilma", 98.0))
val wilmaAndFredScores = sc.parallelize(initialScores).cache() 
val scores = wilmaAndFredScores.combineByKey(createScoreCombiner, scoreCombiner, scoreMerger) 
val averagingFunction = (personScore: PersonScores) => { val (name, (numberScores, totalScore)) = personScore (name, totalScore / numberScores) 
val averageScores = scores.collectAsMap(}.map(averagingFunction)
Expected output: averageScores: scala.collection.Map[String,Double] = Map(Fred ->
91.33333333333333, Wilma -> 95.33333333333333)
Define all three required function , which are input for combineByKey method, e.g.
(createScoreCombiner, scoreCombiner, scoreMerger). And help us producing required results.

In [None]:
# Problem Scenario 52
"""You have been given below code snippet:
b = sc.parallelize([1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1])
Write a correct code snippet which will produce below output:
(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 5, 2 -> 3,4 -> 2, 7 ->1)"""

x = sc.parallelize([1, 2, 3, 4, 5, 5 ,6 ,8, 8, 8]).countByValue()
for item in x.items():
    print(item[0], '->', item[1])
# 1 -> 6
# 2 -> 3
# .
# .
# 8 -> 1

In [None]:
Problem Scenario 53
You have been given below code snippet. 
val a = sc.parallelize(1 to 10, 3) 
operation1 b.collect

Output 1 -
Array[lnt] = Array(2, 4, 6, 8,10)
operation2

Output 2 -
Array[lnt] = Array(1,2, 3)
Write a correct code snippet for operation1 and operation2 which will produce desired output, 
shown above.

In [None]:
Problem Scenario 57
You have been given below code snippet:
a = sc.parallelize(range(1, 10))
Write a correct code snippet which will produce:
((even,[2, 4, 6, 8]), (odd,[1, 3, 5, 7,9]))
##############################################################################################
a = sc.parallelize(range(1, 10))
b = a.groupBy(lambda x: 'even' if x%2 == 0 else 'odd')
for item in b.collect(): 
    print(item[0], ': ', list(item[1]))

In [None]:
Problem Scenario 58
You have been given below code snippet. 
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) 
val b = a.keyBy(_.length) operation1

Write a correct code snippet for operationl which will produce desired output, shown below.
Array[(lnt, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)),
(3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle}}}

In [None]:
Problem Scenario 60
You have been given below code snippet. 
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"}, 3} 
val b = a.keyBy(_.length) 
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","woif","bear","bee"), 3) 
val d = c.keyBy(_.length) operation1
                            
Write a correct code snippet for operationl which will produce desired output, shown below.
Array[(lnt, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)),
(6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)),
(6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)),
(3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

In [None]:
Problem Scenario 63 : You have been given below code snippet. 
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) 
    val b = a.map(x => (x.length, x)) operation1
Write a correct code snippet for operationl which will produce desired output, shown below.
Array[(lnt, String}] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

In [None]:
Problem Scenario 65
You have been given below code snippet. 
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) 
val b = sc.parallelize(1 to a.count.tolnt, 2) 
val c = a.zip(b) operation1
Write a correct code snippet for operationl which will produce desired output, shown below.
Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2>, (ant,5))

In [None]:
Problem Scenario 79
You have been given MySQL DB with following details. 
user=retail_dba 
password=cloudera 
database=retail_db 
table=retail_db.orders 
table=retail_db.order_items 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db
            
Columns of products table : 
(product_id | product categoryid | product_name | product_description | product_prtce | product_image )

Please accomplish following activities.
1. Copy "retaildb.products" table to hdfs in a directory p93_products
2. Filter out all the empty prices
3. Sort all the products based on price in both ascending as well as descending order.
4. Sort all the products based on price as well as product_id in descending order.
5. Use the below functions to do data ordering or ranking and fetch top 10 elements 
top() takeOrdered() sortByKey()

In [None]:
Problem Scenario 80
You have been given MySQL DB with following details. 
user=retail_dba 
password=cloudera 
database=retail_db 
table=retail_db.products 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db
Columns of products table : 
(product_id | product_category_id | product_name | product_description | product_price | product_image )

Please accomplish following activities.
1. Copy "retaildb.products" table to hdfs in a directory p93_products
2. Now sort the products data sorted by product price per category, 
use productcategoryid colunm to group by category

In [None]:
# Problem Scenario 81:
"""You have been given MySQL DB with following details:
You have been given following product.csv file:
productID,productCode,name,quantity,price
1001,PEN,Pen Red,5000,1.23
1002,PEN,Pen Blue,8000,1.25
1003,PEN,Pen Black,2000,1.25
1004,PEC,Pencil 2B,10000,0.48
1005,PEC,Pencil 2H,8000,0.49
1006,PEC,Pencil HB,0,9999.99

Accomplish following activities.
1. Create a Hive ORC table using SparkSql
2. Load this data in Hive table.
3. Create a Hive parquet table using SparkSQL and load data in it."""

# Create df
b = spark.
        read.\
        format('csv').\
        schema("""product_id int, 
               product_code string, 
               name string, 
               quantity int, 
               price float""").\
load("/Users/dbratu/Documents/big_data/1.csv")
b.show()
b.createTempView('tbl1')

# or create directly as hive tbl
create table tbl1(product_id int, 
                  product_code string, 
                  name string, 
                  quantity int, 
                  price float)
row format delimited fields terminated by ',' stored as textfile;

load data local inpath '/Users/dbratu/Documents/big_data/1.csv' into table tbl1;

# Create orc and parquet tables and load data into it
spark.sql("""create table tbl2(product_id int, 
                  product_code string, 
                  name string, 
                  quantity int, 
                  price float)
stored as orc;""")
spark.sql("""insert into tbl3 select * from tbl1;""")

spark.sql("""create table tbl_parquet(product_id int, 
                  product_code string, 
                  name string, 
                  quantity int, 
                  price float)
stored as parquet;""")
spark.sql("""insert into tbl_parquet select * from tbl1;""")

In [None]:
Problem Scenario 83
In Continuation of previous question, please accomplish following activities.
1. Select all the records with quantity >= 5000 and name starts with 'Pen'
2. Select all the records with quantity >= 5000, price is less than 1.24 and name starts with
'Pen'
3. Select all the records witch does not have quantity >= 5000 and name does not starts with 'Pen'
4. Select all the products which name is 'Pen Red', 'Pen Black'
5. Select all the products which has price BETWEEN 1.0 AND 2.0 AND quantity
BETWEEN 1000 AND 2000.

In [None]:
Problem Scenario 89
You have been given below patient data in csv format, 
patientID,name,dateOfBirth,lastVisitDate
1001,Ah Teck,1991-12-31,2012-01-20
1002,Kumar,2011-10-29,2012-09-20
1003,Ali,2011-01-30,2012-10-21

Accomplish following activities.
1. Find all the patients whose lastVisitDate between current time and '2012-09-15'
2. Find all the patients who born in 2011
3. Find all the patients age
4. List patients whose last visited more than 60 days ago
5. Select patients 18 years old or younger
##############################################################################################
some = spark.\
            read.\
            format('csv').\
            option('csv', ',').\
            load('/Users/dbratu/Documents/big_data/81.csv', header=True)
some.show()
# +---------+-------+-----------+-------------+
# |patientID|   name|dateOfBirth|lastVisitDate|
# +---------+-------+-----------+-------------+
# |     1001|Ah Teck| 1991-12-31|   2012-01-20|
# |     1002|  Kumar| 2011-10-29|   2012-09-20|
# |     1003|    Ali| 2011-01-30|   2012-10-21|
# +---------+-------+-----------+-------------+
some.registerTempTable("some_tbl")
spark.sql("""SELECT * FROM some_tbl 
WHERE TO_DATE(CAST(UNIX_TIMESTAMP(lastVisitDate, 'yyyy-MM-dd') AS TIMESTAMP)) 
BETWEEN '2012-10-15' AND current_timestamp() ORDER BY lastVisitDate""").show()
# +---------+----+-----------+-------------+
# |patientID|name|dateOfBirth|lastVisitDate|
# +---------+----+-----------+-------------+
# |     1003| Ali| 2011-01-30|   2012-10-21|
# +---------+----+-----------+-------------+
spark.sql("""SELECT * FROM some_tbl 
WHERE YEAR(TO_DATE(CAST(UNIX_TIMESTAMP(dateOfBirth, 'yyyy-MM-dd') AS TIMESTAMP))) = 2011""").show()
# +---------+-----+-----------+-------------+
# |patientID| name|dateOfBirth|lastVisitDate|
# +---------+-----+-----------+-------------+
# |     1002|Kumar| 2011-10-29|   2012-09-20|
# |     1003|  Ali| 2011-01-30|   2012-10-21|
# +---------+-----+-----------+-------------+
spark.sql("""SELECT name, dateOfBirth, 
                datediff(current_date(), TO_DATE(CAST(UNIX_TIMESTAMP(dateOfBirth, 'yyyy-MM-dd') AS TIMESTAMP)))/365 AS age FROM some_tbl""").
        show()
# +-------+-----------+------------------+
# |   name|dateOfBirth|               age|
# +-------+-----------+------------------+
# |Ah Teck| 1991-12-31|28.786301369863015|
# |  Kumar| 2011-10-29| 8.945205479452055|
# |    Ali| 2011-01-30|  9.69041095890411|
# +-------+-----------+------------------+
spark.sql("""SELECT name, lastVisitDate FROM some_tbl 
WHERE datediff(current_date(), TO_DATE(CAST(UNIX_TIMESTAMP(lastVisitDate, 'yyyy-MM-dd') AS TIMESTAMP))) > 3000""").
show()
# +-------+-------------+
# |   name|lastVisitDate|
# +-------+-------------+
# |Ah Teck|   2012-01-20|
# +-------+-------------+
spark.sql("""SELECT * FROM some_tbl
WHERE datediff(current_date(), TO_DATE(CAST(UNIX_TIMESTAMP(dateOfBirth, "yyyy-MM-dd") AS TIMESTAMP))) > 18""").show()
SELECT' FROM patients WHERE TO_DATE(CAST(UNIXJTlMESTAMP(dateOfBirth,
'yyyy-MM-dd') AS TIMESTAMP}) > DATE_SUB(current_date(),INTERVAL 18 YEAR); val results = sqlContext.sql(......SELECT' FROM patients WHERE
TO_DATE(CAST(UNIX_TIMESTAMP(dateOfBirth, 'yyyy-MM--dd') AS TIMESTAMP)) >
DATE_SUB(current_date(), T8*365)......);

In [None]:
Problem Scenario 91
You have been given data in json format as below:

{"first_name":"Ankit", "last_name":"Jain"}
{"first_name":"Amir", "last_name":"Khan"}
{"first_name":"Rajesh", "last_name":"Khanna"}
{"first_name":"Priynka", "last_name":"Chopra"}
{"first_name":"Kareena", "last_name":"Kapoor"}
{"first_name":"Lokesh", "last_name":"Yadav"}

Do the following activities:
1. create employee.json tile locally.
2. Load this tile on hdfs
3. Register this data as a temp table in Spark using Python.
4. Write select query and print this data.
5. Now save back this selected data in json format.
##############################################################################################
vi employee.json
    # Paste text
:wq

hadoop fs -put 2.json

a = spark.read.json("/Users/dbratu/Documents/big_data/employee.json")
a.createTempView("tbl")
b = spark.sql("select * from tbl where last_name like 'K%'").show()
b.write.json('2b.json')

In [None]:
Problem Scenario 95
You have to run your Spark application on yarn with each executor
Maximum heap size to be 512MB and Number of processor cores to allocate on each 
executor will be 1 and Your main application required three values as input arguments
V1 V2 V3.

Please replace XXX, YYY, ZZZ -
./bin/spark-submit -class com.hadoopexam.MyTask --master yarn-cluster--num-executors 3
--driver-memory 512m XXX YYY lib/hadoopexam.jarZZZ
##############################################################################################
./bin/spark-submit 
        --class com.hadoopexam.MyTask 
        --master yarn-cluster 
        --num-executors 3
        --driver-memory 512M
        --executor-memory 512M
        --executor-cores 1 
        lib/hadoopexam.jar V1 V2 V3

In [None]:
24 : You have been given below comma separated employee information.
name,salary,sex,age
alok,100000,male,29
jatin,105000,male,32
yogesh,134000,male,39
ragini,112000,female,35
jyotsana,129000,female,39
valmiki,123000,male,29

Use the netcat service on port 44444, and nc above data line by line. 
Please do the following activities.
1. Create a flume conf file using fastest channel, which write data in hive warehouse
directory, in a table called flumemaleemployee (Create hive table as well tor given data).
2. While importing, make sure only male employee data is stored.

In [None]:
FLUME

In [None]:
27 : You need to implement near real time solutions for collecting information 
    when submitted in file with below information.

echo "IBM,100,20160104" >> /tmp/spooldir/bb/.bb.txt
echo "IBM,103,20160105" >> /tmp/spooldir/bb/.bb.txt
mv /tmp/spooldir/bb/.bb.txt /tmp/spooldir/bb/bb.txt

echo "IBM,100.2,20160104" >> /tmp/spooldir/dr/.dr.txt
echo "IBM,103.1,20160105" >> /tmp/spooldir/dr/.dr.txt
mv /tmp/spooldir/dr/.dr.txt /tmp/spooldir/dr/dr.txt

You have been given below directory location (if not available than create it) /tmp/spooldir .
You have a finacial subscription for getting stock prices from BloomBerg as well as
Reuters and using ftp you download every hour new files from their respective ftp site in directories /tmp/spooldir/bb and /tmp/spooldir/dr respectively.
As soon as file committed in this directory that needs to be available in hdfs in
/tmp/flume/finance location in a single directory.
Write a flume configuration file named flume7.conf and use it to load data in hdfs with following additional properties .
1. Spool /tmp/spooldir/bb and /tmp/spooldir/dr
2. File prefix in hdfs sholuld be events
3. File suffix should be .log
4. If file is not commited and in use than it should have _ as prefix.
5. Data should be written as text to hdfs

In [None]:
FLUME

In [None]:
57 : You have been given below code snippet. 
    val a = sc.parallelize(1 to 9, 3) operationl
Write a correct code snippet for operationl which will produce:

Array[(String, Seq[lnt])] = Array((even,ArrayBuffer(2, 4, G, 8)), (odd,ArrayBuffer(1, 3, 5, 7,
9)))

In [None]:
a = sc.parallelize(range(1, 10))
a.groupBy(lambda x: "even" if x%2 == 0 else "odd")

In [None]:
34 : You have given a file named spark6/user.csv.
id,topic,hits
Rahul,scala,120 -
Nikita,spark,80 -
Mithun,spark,1 -
myself,cca175,180

Now write a Spark code in scala which will remove the header part and create RDD of values 
as below, for all rows. And also if id is myself" than filter out row.
Map(id -> om, topic -> scala, hits -> 120)

In [None]:
from pyspark import SparkContext

sc = SparkContext(appName="app", master="local")

dummy = """id,topic,hits
Rahul,scala,120 -
Nikita,spark,80 -
Mithun,spark,1 -
myself,cca175,180"""

a = sc.parallelize(dummy.splitlines())
a_m = a.map(lambda x: x.split(","))
header = a_m.first()
a_m_f = a_m.filter(lambda x: (x[0] != header[0]) and (x[0] != 'myself'))
print(*a_m_f.collect(), sep="\n")
a_m_f.saveAsTextFile("dummy.txt")

In [None]:
91 : You have been given data in json format as below.
{"first_name":"Ankit", "last_name":"Jain"}
{"first_name":"Amir", "last_name":"Khan"}
{"first_name":"Rajesh", "last_name":"Khanna"}
{"first_name":"Priynka", "last_name":"Chopra"}
{"first_name":"Kareena", "last_name":"Kapoor"}
{"first_name":"Lokesh", "last_name":"Yadav"}

Do the following activity -
1. create employee.json tile locally.
2. Load this tile on hdfs
3. Register this data as a temp table in Spark using Python.
4. Write select query and print this data.
5. Now save back this selected data in json format.

In [None]:
content = spark.read.json("file.json")
content.createTempView("tabl")
data = spark.sql("select last_name from tabl limit 3")
data.write.json("onl3")

In [None]:
employee = spark.read.json("file.json")
employee.write.parquet("parq.json")
parq_data = spark.read.parquet("parq.json")

In [None]:
2 :There is a parent organization called "ABC Group Inc", which has two child companies 
    named Tech Inc and MPTech.
Both companies employee information is given in two separate text file as below. 

Please do the following activity for employee details.
Tech Inc.txt -
1,Alok,Hyderabad
2,Krish,Hongkong
3,Jyoti,Mumbai
4,Atul,Banglore
5,Ishan,Gurgaon

MPTech.txt -
6,John,Newyork
7,alp2004,California
8,tellme,Mumbai
9,Gagan21,Pune
10,Mukesh,Chennai

1. Which command will you use to check all the available command line options on HDFS 
and How will you get the Help for individual command.
2. Create a new Empty Directory named Employee using Command line. And also create an 
empty file named in it Techinc.txt
3. Load both companies Employee data in Employee directory (How to override existing file 
in HDFS).
4. Merge both the Employees data in a Single tile called MergedEmployee.txt, merged tiles 
should have new line character at the end of each file content.
5. Upload merged file on HDFS and change the file permission on HDFS merged file, so that 
owner and group member can read and write, other user can read the file.
6. Write a command to export the individual file as well as entire directory from HDFS to 
local file System.

In [None]:
mergeing files with help of hdfs dfs command

In [None]:
9 : You have been given a mysql database 
    user=retail_dba 
    password=cloudera 
    database=retail_db 
    jdbc URL = jdbc:mysql://quickstart:3306/retail_db

Please accomplish following.
1. Import departments table in a directory.
2. Again import departments table same directory 
(However, directory already exist hence it should not overrride and append the results)
3. Also make sure your results fields are terminated by '|' and lines terminated by '\n\

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext(appName="app", master="local")
spark = SparkSession.\
    builder.\
    appName("app").\
    master('local').\
    getOrCreate()

departments = spark.\
    read.\
    format("jdbc").\
    options(url="jdbc:mysql://quickstart:3306/retail_db",
            driver = "com.mysql.jdbc.Driver",
            dbtable = "departments",
            user="retail_dba",
            password="cloudera").\
    load()

In [None]:
68 : You have given a file as below.
spark75/file1.txt
File contain some text:

"""Apache Hadoop is an open-source software framework written in Java for distributed storage and distributed processing of very large data sets on computer clusters built from commodity hardware. All the modules in Hadoop are designed with a fundamental assumption that hardware failures are common and should be automatically handled by the framework
The core of Apache Hadoop consists of a storage part known as Hadoop Distributed File
System (HDFS) and a processing part called MapReduce. Hadoop splits files into large blocks and distributes them across nodes in a cluster. To process data, Hadoop transfers packaged code for nodes to process in parallel based on the data that needs to be processed. his approach takes advantage of data locality nodes manipulating the data they have access to to allow the dataset to be processed faster and more efficiently than it would be in a more conventional supercomputer architecture that relies on a parallel file system where computation and data are distributed via high-speed networking
For a slightly more complicated task, lets look into splitting up sentences from our documents into word bigrams. A bigram is pair of successive tokens in some sequence.
We will look at building bigrams from the sequences of words in each sentence, and then try to find the most frequently occuring ones."""

The first problem is that values in each partition of our initial RDD describe lines 
from the file rather than sentences. Sentences may be split over multiple lines. 
The glom() RDD method is used to create a single entry for each document containing the 
list of all lines, we can then join the lines up, then resplit them into sentences using "." 
as the separator, using flatMap so that every object in our RDD is now a sentence.
A bigram is pair of successive tokens in some sequence. Please build bigrams from the sequences
of



In [None]:
64 : You have been given below code snippet:
a = sc.parallelize(["dog", "salmon", "salmon", "rat", "elephant"]) 
b = a.keyBy(_.length) 
c = sc.parallelize(["dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"])
d = c.keyBy(_.length)

Write a correct code snippet which will produce desired output:
Array[(lnt, (Option[String], String))] = Array((6,(Some(salmon),salmon)),
(6,(Some(salmon),rabbit}}, (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)),
(6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)),
(3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),
(3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wo!f)),
(4,(None,bear)))

In [None]:
a = sc.parallelize(["dog", "salmon", "salmon", "rat", "elephant"])
b = a.keyBy(lambda x: len(x))
c = sc.parallelize(["dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"])
d = c.keyBy(lambda x: len(x))
print(b.collect())
print(d.collect())
e = b.fullOuterJoin(d)
print(e.collect())

In [None]:
56 : You have been given below code snippet:
a = sc.parallelize(l to 100. 3) 

Write a correct code snippet which will produce:
Array [Array [Int]] = Array(Array(1, 2, 3,4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16,17,18,19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33),
Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55,
56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66),
Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88,
89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))

In [None]:
a = sc.parallelize(range(1, 100), numSlices=3)
print(a.glom().collect())

In [None]:
31 : You have given following two files
1. Content.txt: Contain a huge text file containing space separated words.
2. Remove.txt: Ignore/filter all the words given in this file (Comma Separated).

Write a Spark program which reads the Content.txt file and load as an RDD, 
remove all the words from a broadcast variables (which is loaded as an RDD of words from 
Remove.txt).
And count the occurrence of the each word and save it as a text file in HDFS.

Content.txt
Hello this is ABCTech.com
This is TechABY.com
Apache Spark Training
This is Spark Learning Session
Spark is faster than MapReduce

Remove.txt
Hello, is, this, the

In [None]:
s = """Hello this is ABCTech.com
This is TechABY.com
Apache Spark Training
This is Spark Learning Session
Spark is faster than MapReduce"""

remove = "Hello, is, this, the"
rem_rdd = sc.parallelize(remove.split(",")).map(lambda x: x.strip())
br = sc.broadcast(rem_rdd.collect())

text = sc.parallelize(s.splitlines())
text_m = text.flatMap(lambda x: x.split(" "))
text_m_f = text_m.filter(lambda x: x not in br.value)
text_m_f_f = text_m_f.map(lambda x: (x, 1))
counter = text_m_f_f.reduceByKey(lambda x, y: x + y)
print(text_m_f.collect())
print(counter.collect())
counter.saveAsTextFile("dumm")

In [None]:
3: You have been given MySQL DB with following details
user=retail_dba 
password=cloudera 
database=retail_db 
table=retail_db.categories 
jdbc URL = jdbc:mysql://quickstart:3306/retail_db

Please accomplish following activities.
1. Import data from categories table, where category=22 (Data should be stored in categories subset)
2. Import data from categories table, where category>22 (Data should be stored in categories_subset_2)
3. Import data from categories table, where category between 1 and 22 (Data should be stored in categories_subset_3)
4. While importing catagories data change the delimiter to '|' (Data should be stored in categories_subset_S)
5. Importing data from catagories table and restrict the import to category_name,category id columns only with delimiter as '|'
6. Add null values in the table using below SQL statement ALTER TABLE categories modify category_department_id int(11); INSERT INTO categories values
(eO.NULL.'TESTING');
7. Importing data from catagories table (In categories_subset_17 directory) using '|' delimiter and categoryjd between 1 and 61 and encode null values for both string and non string columns.
8. Import entire schema retail_db in a directory categories_subset_all_tables

sqoop

In [None]:
71 : Write down a Spark script using Python, in which it read a file "Content.txt" 
(On hdfs) with following content.
After that split each row as (key, value), where key is first word in line and entire line as value.
Filter out the empty lines.
And save this key value in "problem86" as Sequence file(On hdfs)
Save as sequence file , where key as null and entire line as value. 
Read back the stored sequence files.

Content.txt -

Hello this is ABCTECH.com -

This is XYZTECH.com -

Apache Spark Training -

This is Spark Learning Session -

Spark is faster than MapReduce -

In [None]:
from pyspark import SparkContext

sc = SparkContext(appName="app", master='local')

a = """Hello this is ABCTECH.com -

This is XYZTECH.com -

Apache Spark Training -

This is Spark Learning Session -

Spark is faster than MapReduce -"""

txt = sc.parallelize(a.splitlines())
txt_m = txt.map(lambda x: (x.split(" ")[0], x))
txt_m_f = txt_m.filter(lambda x: x[1] != "")
print(txt_m_f.collect())

In [None]:
45 : You have been given 2 files , with the content as given Below
(spark12/technology.txt)
first,last,technology

Amit,Jain,java -

Lokesh,kumar,unix -

Mithun,kale,spark -

Rajni,vekat,hadoop -

Rahul,Yadav,scala -

(spark12/salary.txt)
first,last,salary

Amit,Jain,100000 -

Lokesh,kumar,95000 -

Mithun,kale,150000 -

Rajni,vekat,154000 -

Rahul,Yadav,120000 -

Write a Spark program, which will join the data based on first and last name and 
save the joined results in following format, first Last.technology.salary

In [None]:
from pyspark import SparkContext

sc = SparkContext(appName="app", master='local')

technology = """
first,last,technology

Amit,Jain,java -

Lokesh,kumar,unix -

Mithun,kale,spark -

Rajni,vekat,hadoop -

Rahul,Yadav,scala -"""

salary = """first,last,salary

Amit,Jain,100000 -

Lokesh,kumar,95000 -

Mithun,kale,150000 -

Rajni,vekat,154000 -

Rahul,Yadav,120000 -"""

# join the data based on first and last name and save as
# first, last, technology, salary

technology_rdd = sc.\
    parallelize(technology.splitlines()).\
    filter(lambda x: len(x) > 0).\
    map(lambda x: ((x.split(",")[0], x.split(",")[1]), x.split(",")[2])).\
    filter(lambda x: x[0][0] != 'first')
salary_rdd = sc.\
    parallelize(salary.splitlines()).\
    filter(lambda x: len(x) > 0).\
    map(lambda x: ((x.split(",")[0], x.split(",")[1]), x.split(",")[2].strip("-").strip(" "))).\
    filter(lambda x: x[0][0] != 'first')

joined = technology_rdd.join(salary_rdd)
print(technology_rdd.collect())
print(salary_rdd.collect())
print(joined.collect())

In [None]:
54 : You have been given below code snippet
a = sc.parallelize(["dog", "tiger", "lion", "cat", "panther", "eagle"]) 
b = a.map(x => (x.length, x))

Write a correct code snippet which will produce desired output, shown below.
Array[(lnt, String)] = Array((4,lion), (7,panther), (3,dogcat), (5,tigereagle))

In [None]:
from pyspark import SparkContext

sc = SparkContext(appName="aoo", master="local")

a = sc.parallelize(["dog", "tiger", "lion", "cat", "panther", "eagle"])
b = a.map(lambda x: (len(x), x))
print(a.collect())
print(b.collect())

c = b.reduceByKey(lambda x, y: x + y)
print(c.collect())

In [None]:
52 : You have been given below code snippet.
b = sc.parallelize([1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1])

Write a correct code snippet for Operation_xyz which will produce below output.
(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> S, 2 -> 3, 4 -> 2, 7 ->1)

In [None]:
from pyspark import SparkContext

sc = SparkContext(appName="aoo", master="local")

b = sc.parallelize([1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1])

print(b.countByValue())

In [None]:
81 : You have been given following product.csv
productID,productCode,name,quantity,price
1001,PEN,Pen Red,5000,1.23
1002,PEN,Pen Blue,8000,1.25
1003,PEN,Pen Black,2000,1.25
1004,PEC,Pencil 2B,10000,0.48
1005,PEC,Pencil 2H,8000,0.49
1006,PEC,Pencil HB,0,9999.99

1. Create a Hive ORC table using SparkSql
2. Load this data in Hive table.
3. Create a Hive parquet table using SparkSQL and load data in it.