# Case Study Adidas

## This Notebook demonstrates the capabilities of Zeppelin Notebooks taking into account relevant examples for data cleansing, pre-processing and loading the data to an external warehouse.

### Pre-requisites
#####         EMR Cluster with Zeppelin
#####         IAM Roles with appropriate permissions - EC2, S3
#####         Data from Open Library Loaded to S3 bucket



## Initialising the Spark Session

In [2]:
%pyspark
spark

In [3]:
%pyspark
spark_conf_list= spark.sparkContext.getConf().getAll()
for i in spark_conf_list:
    print(i)

In [4]:
%pyspark
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), 
                                        ('spark.app.name', 'case-study-adidas'),
                                        ('spark.executor.cores', '4'),
                                        ('spark.cores.max', '4'), 
                                        ('spark.driver.memory','4g')])
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

In [5]:
%spark.conf
#spark.submit.deployMode client

In [6]:
%sh
cd /mnt/var/lib/zeppelin/git-repo
git clone https://github.com/Prashast18/data-playground-adidas.git .


In [7]:
%sh
cd /mnt/var/lib/zeppelin/git-repo
git clone https://github.com/Prashast18/data-playground-adidas.git .

git checkout -b dev origin/master
chmod -R 777 /mnt/var/lib/zeppelin/git-repo/Zeppelin/

In [8]:
%sh
cp /mnt/var/lib/zeppelin/notebook/* /mnt/var/lib/zeppelin/git-repo/Zeppelin/

In [9]:
%sh

#Default Location for  Zeppelin Notebooks: 
#       /mnt/var/lib/zeppelin/notebook/

#Zeppelin Notebooks needs to be exported to .ipynb file before pushing to Git

cp /mnt/var/lib/zeppelin/notebook/ /mnt/var/lib/zeppelin/git-repo/Zeppelin/
git add -A
git commit -m "Commit from Zeppelin"
git push origin

In [10]:
%pyspark
df=  spark.read.json("s3://my-aws-staging-bucket/ol_cdump/ol_cdump.json")

In [11]:
%pyspark
z.show(df.limit(10))

In [12]:
%pyspark

df.count()

In [13]:
%pyspark
df.printSchema()


In [14]:
%pyspark

summary_df = df.describe()
z.show(summary_df.limit(5))

""" Content Discovery:

1- The following fields are present in all the data
                Revision
                Latest_revision
                Key
 
2 -Almost all empty, can be treated as less relevant columns.
                website
                fuller_name
                full_title
                ia_loaded_id
                bio
                birth_date
                death _rate
                personal_name (can be a trivia)
                
"""
 

In [15]:
%pyspark
z.show(df.describe("latest_revision", "revision", "number_of_pages" ,  "weight"))

"""Statistical Analysis Observations
            1- Min & Max Revision - 1, 50
            2- lates_revision & revision hold the same data
            3- Book with Zero page - 0 & 48418 (both look unrealistic)
"""

In [16]:
%pyspark

#Find count for empty, None, Null, Nan with string literals

from pyspark.sql.functions import col,isnan,when,count

df.filter_cols = ["bio", "birth_date", "by_statement","key", "latest_revision", "title", "publish_country"]

count_null_df = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.filter_cols])

In [17]:
%pyspark
count_null_df.show()

In [18]:
%pyspark

# Publish Country can be ommitted, but number of authors who co-authored becomes irrelevant
null_cols = ["title", "number_of_pages","publish_date" ]

cleaned_df= df.na.drop(subset=null_cols)


# Based on the business logic, we can fill the columns with some static value
massaged_df = df.na.fill("Unknown", ["bio", "birth_date"]).select("bio", "birth_date").show(2)

In [19]:
%pyspark
cleaned_df.count()

In [20]:
%pyspark

""" Following columns do not values in the given data, thay can be used for detecting the outliers, but we are dropping these columns for simplicity
                website
                fuller_name
                full_title
                ia_loaded_id
                bio
                birth_date
                death _rate
                personal_name
                full_title
                fuller_name
                ia_box_id
                ia_loaded_id
                isbn_invalid
                isbn_odd_length
 """
 
 
unimp_cols = [ 'full_title','fuller_name','ia_box_id', 'ia_loaded_id', 'isbn_invalid', 'isbn_odd_length', 'website', 'bio','birth_date', 'death_rate' , 'personal_name']
cleaned_df = cleaned_df.drop(*unimp_cols)



In [21]:
%pyspark
cleaned_df.select('authors.key','authors.author.key','publish_date','first_publish_date', 'work_title','work_titles', 'subject_time','subject_times').show(100)

In [22]:
%pyspark

"""                 Valid Publish date & country
                    Not without title
                    Valid Birth Date
                    Number of pages - 0
                    Publish year > 1950 
                    Valid Publish Year  - Publish Year has Junk Values - needs to be cleaned
"""

cleaned_df = cleaned_df.withColumn('publish_year',regexp_extract('publish_date', r'(\d{4})', 1))
base_df = cleaned_df.filter((cleaned_df["publish_year"] >= '1950') & 
                            (cleaned_df["publish_year"] <= '2022') & 
                            (cleaned_df["title"] != '') & 
                            (df["number_of_pages"] > 0))

In [23]:
%pyspark
base_df.count()

In [24]:
%pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, count, to_timestamp, year, regexp_extract, coalesce, from_json, col, when, explode, length, size, lit, current_date, trim, dense_rank, regexp_replace, split, countDistinct, to_date, concat_ws
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.window import Window

In [25]:
%pyspark

## Merge Columns with similar values

base_df = base_df.\
            withColumn('oclc_number',coalesce('oclc_number','oclc_numbers')).\
            withColumn('subject_time',coalesce('subject_time','subject_times')).\
            withColumn('work_title',coalesce('work_title','work_titles')).\
            withColumn('publish_date',coalesce('publish_date','first_publish_date')).\
            withColumn('authors', coalesce('authors.key','authors.author.key')).\
            drop('oclc_numbers','subject_times','work_titles','first_publish_date')

In [26]:
%pyspark
base_df.select('title','number_of_pages').orderBy(df.number_of_pages.desc()).limit(1).show(truncate=False)

In [27]:
%pyspark
w = Window.orderBy(col('number_of_pages').desc())
min_page_df = base_df.select('title','number_of_pages',dense_rank().over(w).alias('rank_num_pages'))
min_page_df.filter(col('rank_num_pages') == 1).select('title','number_of_pages').show(9,False)

In [28]:
# %pyspark
# base_df.registerTempTable("books_info")
# spark.sql("""select title, author,
#                     description,
#                     publish_date,
#                     RANK() OVER (PARTITION BY title
#                     ORDER BY number_of_pages) AS rank from book_info""")

In [29]:
%sh
python3 -m pip install pandas

In [30]:
%python


import sys
sys.path.append("/usr/local/lib/python3.7/site-packages")
sys.path.append("/var/lib/zeppelin/.local/bin")
print(sys.path)

In [31]:
%sh
python3 -m pip list

In [32]:
%pyspark

#Sample Genre
        #[Outlines, syllabi, etc]


base_df.select('title',explode('genres').alias('genres')).select('title',trim(regexp_replace('genres', '[^A-Za-z0-9, ]','')).alias('genres')).groupBy('genres').\
agg(count('title').alias('book_count')).\
orderBy(col('book_count').desc()).\
show(200,truncate= False)

# Observation
        #Multi-Lingual Text with same information

In [33]:
%sh
python3 -m pip install langdetect
python3 -m pip install googletrans

In [34]:
// %pyspark

// from langdetect import detect
// print(detect("Ficcin juvenil"))

// from googletrans import Translator, constants
// from pprint import pprint

// translator = Translator()
// translation = translator.translate("Ficcin juvenil")
// print(f"{translation.origin} ({translation.src}) --> {translation.text} ({translation.dest})")

In [35]:
%pyspark

#Clean Data DF and replace [.]

exploded_genre_df =base_df.select('title',explode('genres').alias('genres')).select('title',trim(regexp_replace('genres', '[^A-Za-z0-9, ]','')).alias('genres'))

In [36]:
%pyspark
exploded_genre_df.printSchema()

In [37]:
%pyspark
top_genre= exploded_genre_df.withColumn("genres",  when(exploded_genre_df.genres == "Literatura juvenil","Juvenile literature").
                                                            when(exploded_genre_df.genres == "Ficcin","Fiction").
                                                            when(exploded_genre_df.genres == "Ficcin juvenil","Juvenile fiction").
                                                            when(exploded_genre_df.genres == "Humorismo", "Humor").
                                                            otherwise(exploded_genre_df.genres)).groupBy('genres').\
agg(count('title').alias('book_count')).\
orderBy(col('book_count').desc()).limit(5)

z.show(top_genre)

In [38]:
%pyspark
base_df.filter(size('authors') > 1).select('authors').show(1000, truncate=False)

In [39]:
%pyspark

co_auth_df = base_df.filter(size('authors') > 1).select('title',explode('authors').alias('authors'))


co_auth_df.groupBy('authors').\
agg(count('title').alias('book_count')).\
orderBy(col('book_count').desc()).\
show(5,False)



In [40]:
%pyspark

publ_books_df = base_df.select('publish_date',explode('authors').alias('authors')).distinct()

publ_books_df.groupBy('publish_date').\
agg(count('authors').alias('no_of_authors')).\
orderBy(col('publish_date').desc()).\
show(9999,False)

In [41]:
%pyspark

# Filter Date between 1950 and 1970
books_pub_r_df = base_df.filter((col('publish_year') >= lit('1950')) & (col('publish_year') <= lit('1970'))).select(explode('authors').alias('authors'),'title','publish_year','publish_date')


# Extract Publish month
books_pub_r_df =books_pub_r_df.withColumn('publish_month', regexp_extract('publish_date',r'([A-Z][a-z]+)', 1))


# Group by Month and Year, and count distinct title in every group
books_pub_r_df.groupBy(concat_ws(' ','publish_month','publish_year').alias('month_year')).\
agg(countDistinct('authors').alias('authors'),countDistinct('title').alias('books')).\
orderBy(concat_ws(' ','publish_month','publish_year')).show(99,False)