In [1]:
import numpy as np
import pandas as pd
import boto3
import json
import matplotlib.pyplot as plt
import pickle
import re

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import udf
from tqdm.notebook import tqdm
from pyspark.sql.types import *
from pyspark.sql.functions import struct
from pyspark.sql.functions import countDistinct
import os

In [2]:
# load bucket
AVSLS_BUCKET = 'miba-ma-prj-aviasales'

with open('access.json') as file:
    access_data = json.load(file)

In [3]:
# load session
session = boto3.session.Session()
s3 = session.client(
    service_name='s3',
    aws_access_key_id=access_data['aws_access_key_id'],
    aws_secret_access_key=access_data['aws_secret_access_key'],
    endpoint_url='https://hb.bizmrg.com'
)

In [4]:
conf = SparkConf()
conf.set('spark.master', 'local[*]')
conf.set('spark.executor.memory', '16G')
conf.set('spark.driver.memory', '16G')
conf.set('spark.driver.maxResultSize', '16G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

In [5]:
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_data['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', access_data['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'https://hb.bizmrg.com')

# Processing verticals

In [45]:
verticals = pd.read_csv("verticals.csv", delimiter=";").dropna(subset=['brand']).fillna(0)
verticals

Unnamed: 0.1,Unnamed: 0,brand,Primary sector,Flights,Buses,Trains,Car Rentals,Transfers,Water transport,Hotels,...,Banking,SIM-Cards,Loyalty programs,Parking,Legal Services,Accessories,Information,Aggregator,Not travel,Other
0,0,12go,Aggregator,1.0,1.0,1.0,1.0,1.0,1.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
1,1,12-travel,Aggregator,1.0,0.0,0.0,1.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
2,2,1800hotels,Aggregator,0.0,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
3,3,5pm,Aggregator,0.0,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0
4,4,5vorflug,Aggregator,1.0,0.0,0.0,1.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3372,3372,houseofbritain,Tours and Activities,0.0,0.0,1.0,1.0,0.0,1.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3373,3373,esim,Other,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
3374,3374,litzdance,Shopping,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3375,3375,wilsumerberge,Camping,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [27]:
verticals.columns[2]

'Flights'

In [24]:
for i in range(len(verticals)):
    primary_sector = verticals.iloc[i][1]
    verticals.at[i, primary_sector] = 1

In [25]:
verticals.to_csv('verticals.csv')

# Searching combinations

In [28]:
verticals = pd.read_excel('travel_verticals/verticals words.xlsx', usecols=['brand', 'Primary sector'])
verticals = verticals.loc[verticals['Primary sector'] != 'Not travel'].reset_index(drop=True)
verticals

Unnamed: 0,brand,Primary sector
0,travel-dealz,Aggregator
1,kkday,Aggregator
2,12go,Aggregator
3,12-travel,Aggregator
4,1800hotels,Aggregator
...,...,...
3148,sealink,Water transport
3149,stenaline,Water transport
3150,wightlink,Water transport
3151,balearia eu,Water transport


In [30]:
verticals = spark.createDataFrame(verticals)

In [34]:
verticals.count()

3153

In [32]:
df = spark.read.csv(f'shared/avs/dataset.csv', sep=',', header=True).select(['brand','refdomain'])
df.limit(5).toPandas()

Unnamed: 0,brand,refdomain
0,hotels,gocouponcodes.com
1,hotels,thehotelreservations.com
2,hotels,toliveanddine.com
3,hotels,realfamilytrips.com
4,hotels,vibafima.com


In [33]:
joined = df.join(verticals, df.brand == verticals.brand, how='left').select(['refdomain','Primary sector']).dropDuplicates(['refdomain', 'Primary sector'])


#joined = joined.filter(df.advertiser != 'dolcevita.com')
joined.limit(10).toPandas()

Unnamed: 0,refdomain,Primary sector
0,airoocorp.blogspot.com,Aggregator
1,rmn.ph,Aggregator
2,journeyofanomadicfamily.com,Aggregator
3,aviatse.kz,Aggregator
4,travelfeeddiaries.com,Aggregator
5,bestlifevision.com,Aggregator
6,thecashbackdad.com,Aggregator
7,20200217t134104-dot-almowafir.appspot.com,Aggregator
8,oneworld-7.blogspot.com,Flights
9,texasroundpen.com,Loyalty programs


In [74]:
df = joined.toPandas()

In [38]:
test1 = df[df.refdomain == '043startpagina.nl']
test2 = df[df.refdomain == '043startpagina.nl']

In [40]:
test1

Unnamed: 0,refdomain,Primary sector
26173,043startpagina.nl,Vacation Rentals
70529,043startpagina.nl,Shopping


In [41]:
test3 = pd.merge(test1, test2, on='refdomain')
test3 = test3[(test3['Primary sector_x'] != test3['Primary sector_y'])]
test3

Unnamed: 0,refdomain,Primary sector_x,Primary sector_y
1,043startpagina.nl,Vacation Rentals,Shopping
2,043startpagina.nl,Shopping,Vacation Rentals


In [75]:
unique_affs = list(set((df.refdomain)))
combinations = pd.DataFrame()    
                   
for i in range(len(unique_affs)):
    aff1 = df[df.refdomain == unique_affs[i]]
    aff2 = df[df.refdomain == unique_affs[i]]
    join = pd.merge(aff1, aff2, on='refdomain')
    join = join[(join['Primary sector_x'] != join['Primary sector_y'])]
    combinations = combinations.append(join)

In [76]:
combinations.to_csv('combinations.csv', index=False)

# Searching combinations - Aviasales verticals

In [77]:
verticals = pd.read_excel('travel_verticals/verticals words - Aviasales.xlsx', usecols=['brand', 'Primary sector'])
verticals = verticals.loc[verticals['Primary sector'] != 'Not travel'].reset_index(drop=True)
verticals

Unnamed: 0,brand,Primary sector
0,travel-dealz,Aggregator
1,kkday,Tours and Activities
2,12go,Aggregator
3,12-travel,Package tours
4,1800hotels,Other
...,...,...
3148,sealink,Water transport
3149,stenaline,Water transport
3150,wightlink,Water transport
3151,balearia eu,Water transport


In [78]:
verticals = spark.createDataFrame(verticals)

In [79]:
df = spark.read.csv(f'shared/avs/dataset.csv', sep=',', header=True).select(['brand','refdomain'])
df.limit(5).toPandas()

Unnamed: 0,brand,refdomain
0,hotels,gocouponcodes.com
1,hotels,thehotelreservations.com
2,hotels,toliveanddine.com
3,hotels,realfamilytrips.com
4,hotels,vibafima.com


In [80]:
joined = df.join(verticals, df.brand == verticals.brand, how='left').select(['refdomain','Primary sector']).dropDuplicates(['refdomain', 'Primary sector'])


#joined = joined.filter(df.advertiser != 'dolcevita.com')
joined.limit(10).toPandas()

Unnamed: 0,refdomain,Primary sector
0,theatlasedit.com,Tours and Activities
1,cheaptripdeals.net,Tours and Activities
2,erfahrungenscout.de,Tours and Activities
3,thefeelingremains.com,Flights
4,youtraveldeals.com,Flights
5,tripperstale.com,Flights
6,globalaffiliateprograms.co.uk,Flights
7,gobot.com,Flights
8,chasingwhereabouts.com,Flights
9,foradazonadeconforto.com,Flights


In [81]:
df = joined.toPandas()

In [82]:
test1 = df[df.refdomain == '043startpagina.nl']
test2 = df[df.refdomain == '043startpagina.nl']

In [83]:
test1

Unnamed: 0,refdomain,Primary sector
30355,043startpagina.nl,Vacation Rentals
82315,043startpagina.nl,Shopping


In [84]:
test3 = pd.merge(test1, test2, on='refdomain')
test3 = test3[(test3['Primary sector_x'] != test3['Primary sector_y'])]
test3

Unnamed: 0,refdomain,Primary sector_x,Primary sector_y
1,043startpagina.nl,Vacation Rentals,Shopping
2,043startpagina.nl,Shopping,Vacation Rentals


In [85]:
unique_affs = list(set((df.refdomain)))
combinations = pd.DataFrame()    
                   
for i in range(len(unique_affs)):
    aff1 = df[df.refdomain == unique_affs[i]]
    aff2 = df[df.refdomain == unique_affs[i]]
    join = pd.merge(aff1, aff2, on='refdomain')
    join = join[(join['Primary sector_x'] != join['Primary sector_y'])]
    combinations = combinations.append(join)

In [86]:
combinations.to_csv('combinations - Aviasales.csv', index=False)

In [16]:
combinations = pd.read_csv('combinations - aviasales.csv')

In [17]:
combinations.head(50)

Unnamed: 0,refdomain,Primary sector_x,Primary sector_y
0,xgratis.nl,Flights,Information
1,xgratis.nl,Information,Flights
2,nosnatrip.com.br,Car Rentals,Hotels
3,nosnatrip.com.br,Hotels,Car Rentals
4,bestemmingaustralie.nl,Package tours,Shopping
5,bestemmingaustralie.nl,Shopping,Package tours
6,10cose.it,Hotels,Tours and Activities
7,10cose.it,Tours and Activities,Hotels
8,onlinewebshop.net,Hotels,Flights
9,onlinewebshop.net,Hotels,Aggregator


In [194]:
joined.count()

108380

# Joining datasets with travel domains

In [9]:
verticals = spark.read.csv('razmetka.csv', sep=';', header=True).select(['brand', 'Primary sector'])
verticals.limit(10).toPandas()

Unnamed: 0,brand,Primary sector
0,12go,Aggregator
1,12-travel,Aggregator
2,1800hotels,Aggregator
3,5pm,Aggregator
4,5vorflug,Aggregator
5,ab-in-den-urlaub,Aggregator
6,actievandedag,Aggregator
7,aanzee,Aggregator
8,abay,Aggregator
9,affordabletours,Aggregator


In [10]:
df = spark.read.csv(f'shared/avs/final_data.csv', sep=',', header=True).select(['brand','refdomain'])
df.limit(5).toPandas()

Unnamed: 0,brand,refdomain
0,agoda,chandays.blogspot.com
1,agoda,promotionhotels.blogspot.com
2,agoda,52.77.63.17
3,agoda,romantichotelsinuk.blogspot.com
4,agoda,bestbookinghotelsofasia.blogspot.com


In [11]:
joined = df.join(verticals, df.brand == verticals.brand, how='left').select(['refdomain','Primary sector']).dropDuplicates(['refdomain', 'Primary sector'])


#joined = joined.filter(df.advertiser != 'dolcevita.com')
joined.limit(10).toPandas()

Unnamed: 0,refdomain,Primary sector
0,irisharchaeologicalresearch.blogspot.com,Aggregator
1,daydaytravel.hk,Aggregator
2,k-trekkingtourthailand.blogspot.com,Aggregator
3,agodahotels94.blogspot.com,Aggregator
4,taiwan106.blogspot.com,Aggregator
5,backpacker8888.blogspot.com,Aggregator
6,choicehotelsonlinereservation.blogspot.com,Aggregator
7,nfljerseyswholesale65.blogspot.com,Aggregator
8,hoteldealshouston.blogspot.com,Aggregator
9,pinterior.design,Aggregator


In [12]:
df = joined.toPandas()

In [17]:
df['Count of refdomain'] = 1

In [20]:
df2 = df.pivot_table(values='Count of refdomain', index='refdomain', columns=['Primary sector'], aggfunc=np.sum)
df2.head()

Primary sector,Aggregator,Buses,Camping,Car Rentals,Cruises,Financial services,Flights,Food & Beverage,Hostels,Hotels,...,Other,Outdoors,Package tours,Parking,Shopping,Tours and Activities,Trains,Transfers,Vacation Rentals,Water transport
refdomain,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
000webhostapp.com,1.0,,,,,,,,,1.0,...,,,,,,,,,,
00580.com,,,,,,,1.0,,,,...,,,,,,,,,,
007museum.com,1.0,,,,,,,,,1.0,...,,,,,,,,,,
007tour.ru,1.0,,,,,,,,,,...,,,,,,,,,,
007travelers.blogspot.com,1.0,,,,,,,,,,...,,,,,,,,,,


In [21]:
df2.fillna(0,inplace=True)
df2.head()

Primary sector,Aggregator,Buses,Camping,Car Rentals,Cruises,Financial services,Flights,Food & Beverage,Hostels,Hotels,...,Other,Outdoors,Package tours,Parking,Shopping,Tours and Activities,Trains,Transfers,Vacation Rentals,Water transport
refdomain,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
000webhostapp.com,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
00580.com,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
007museum.com,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
007tour.ru,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
007travelers.blogspot.com,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [22]:
column1_list = ['Aggregator', 'Buses', 'Camping', 'Car Rentals', 'Cruises',
       'Financial services', 'Flights', 'Food & Beverage', 'Hostels', 'Hotels',
       'Information', 'Insurance', 'Legal Services', 'Loyalty programs',
       'Other', 'Outdoors', 'Package tours', 'Parking', 'Shopping',
       'Tours and Activities', 'Trains', 'Transfers', 'Vacation Rentals',
       'Water transport']
column2_list = ['Aggregator', 'Buses', 'Camping', 'Car Rentals', 'Cruises',
       'Financial services', 'Flights', 'Food & Beverage', 'Hostels', 'Hotels',
       'Information', 'Insurance', 'Legal Services', 'Loyalty programs',
       'Other', 'Outdoors', 'Package tours', 'Parking', 'Shopping',
       'Tours and Activities', 'Trains', 'Transfers', 'Vacation Rentals',
       'Water transport']

dict = {}
for i in column1_list:
    for t in column2_list:
        if i != t:
            df3 = df2[[i,t]]
            df3 = df3.groupby([i,t]).size().reset_index().rename(columns={0:'count'})
            df3 = df3[~(df3 == 0).any(axis=1)]
            comb_name = i + "-" + t
            if comb_name in dict and len(df3) != 0:
                dict[comb_name] = dict[comb_name] + df3['count'].iloc[0]
            elif comb_name not in dict and len(df3) != 0:
                dict[comb_name] = df3['count'].iloc[0]
            else:
                dict[comb_name] = 0
        else:
            continue

In [31]:
print(dict.)

{'Aggregator-Buses': 888, 'Aggregator-Camping': 105, 'Aggregator-Car Rentals': 1606, 'Aggregator-Cruises': 258, 'Aggregator-Financial services': 99, 'Aggregator-Flights': 3391, 'Aggregator-Food & Beverage': 36, 'Aggregator-Hostels': 16, 'Aggregator-Hotels': 3726, 'Aggregator-Information': 1657, 'Aggregator-Insurance': 613, 'Aggregator-Legal Services': 368, 'Aggregator-Loyalty programs': 627, 'Aggregator-Other': 131, 'Aggregator-Outdoors': 69, 'Aggregator-Package tours': 3732, 'Aggregator-Parking': 492, 'Aggregator-Shopping': 466, 'Aggregator-Tours and Activities': 2049, 'Aggregator-Trains': 308, 'Aggregator-Transfers': 477, 'Aggregator-Vacation Rentals': 1212, 'Aggregator-Water transport': 163, 'Buses-Aggregator': 888, 'Buses-Camping': 14, 'Buses-Car Rentals': 196, 'Buses-Cruises': 67, 'Buses-Financial services': 3, 'Buses-Flights': 332, 'Buses-Food & Beverage': 12, 'Buses-Hostels': 10, 'Buses-Hotels': 324, 'Buses-Information': 175, 'Buses-Insurance': 51, 'Buses-Legal Services': 55, 'B

In [25]:
# все комбинации
for i in dict:
    print(i, dict[i])

Aggregator-Buses 888
Aggregator-Camping 105
Aggregator-Car Rentals 1606
Aggregator-Cruises 258
Aggregator-Financial services 99
Aggregator-Flights 3391
Aggregator-Food & Beverage 36
Aggregator-Hostels 16
Aggregator-Hotels 3726
Aggregator-Information 1657
Aggregator-Insurance 613
Aggregator-Legal Services 368
Aggregator-Loyalty programs 627
Aggregator-Other 131
Aggregator-Outdoors 69
Aggregator-Package tours 3732
Aggregator-Parking 492
Aggregator-Shopping 466
Aggregator-Tours and Activities 2049
Aggregator-Trains 308
Aggregator-Transfers 477
Aggregator-Vacation Rentals 1212
Aggregator-Water transport 163
Buses-Aggregator 888
Buses-Camping 14
Buses-Car Rentals 196
Buses-Cruises 67
Buses-Financial services 3
Buses-Flights 332
Buses-Food & Beverage 12
Buses-Hostels 10
Buses-Hotels 324
Buses-Information 175
Buses-Insurance 51
Buses-Legal Services 55
Buses-Loyalty programs 62
Buses-Other 26
Buses-Outdoors 24
Buses-Package tours 301
Buses-Parking 64
Buses-Shopping 48
Buses-Tours and Activitie

In [14]:
test1 = df[df.refdomain == '043startpagina.nl']
test2 = df[df.refdomain == '043startpagina.nl']

In [15]:
test2

Unnamed: 0,refdomain,Primary sector
46417,043startpagina.nl,Not travel
61539,043startpagina.nl,Shopping
77646,043startpagina.nl,Financial services


In [36]:
test3 = pd.merge(test1, test2, on='refdomain')
test3 = test3[(test3['Primary sector_x'] != test3['Primary sector_y'])]
test3

Unnamed: 0,refdomain,Primary sector_x,Primary sector_y
1,043startpagina.nl,Not travel,Shopping
2,043startpagina.nl,Not travel,Financial services
3,043startpagina.nl,Shopping,Not travel
5,043startpagina.nl,Shopping,Financial services
6,043startpagina.nl,Financial services,Not travel
7,043startpagina.nl,Financial services,Shopping


In [None]:
unique_affs = list(set((df.refdomain)))
combinations = pd.DataFrame()    
                   
for i in range(len(unique_affs)):
    aff1 = df[df.refdomain == unique_affs[i]]
    aff2 = df[df.refdomain == unique_affs[i]]
    join = pd.merge(aff1, aff2, on='refdomain')
    join = join[(join['Primary sector_x'] != join['Primary sector_y'])]
    combinations = combinations.append(join)

In [None]:
combinations.to_csv('combinations.csv', index=False)

In [6]:
combinations = pd.read_csv('combinations.csv')

In [8]:
combinations.head(50)

Unnamed: 0,refdomain,Primary sector_x,Primary sector_y
0,tickethulp.nl,Aggregator,Flights
1,tickethulp.nl,Flights,Aggregator
2,lesbonsplansdelaurent.com,Aggregator,Flights
3,lesbonsplansdelaurent.com,Flights,Aggregator
4,eqla3.com,Aggregator,Flights
5,eqla3.com,Flights,Aggregator
6,allesoverplayadelasamericas.nl,Package tours,Aggregator
7,allesoverplayadelasamericas.nl,Aggregator,Package tours
8,start4all.com,Package tours,Aggregator
9,start4all.com,Package tours,Hotels


In [194]:
joined.count()

108380

# S3 writting

In [197]:
files = ['2lka.net.csv',
'7eer.net.csv',
'admitad.com.csv',
'affilired.com.csv',
'anrdoezrs.net.csv',
'avantlink.com.csv',
'awin1.com.csv',
'cityads.csv',
'click.linksynergy.com.csv',
'dpbolvw.net.csv',
'evyy.net.csv',
'go.skimresources.com.csv',
'jdoqocy.com.csv',
'kqzyfj.com.csv',
'ojrq.net.csv',
'prf.hn.csv',
'pxf.io.csv',
'sjv.io.csv',
'tc.tradetracker.net.csv',
'tradedoubler.com.csv',
'viglink.com.csv',
]

In [198]:
for i in files:
    path_file_upload = f'final/{i}'
    path_file_s3 = f'work/network_affiliates_final/{i}'
    s3.upload_file(path_file_upload, AVSLS_BUCKET, path_file_s3)

In [199]:
bucket = AVSLS_BUCKET
prefix = 'work/network_affiliates_final'
for obj in s3.list_objects_v2(Bucket=bucket, Prefix=prefix)['Contents']:
    print(obj['Key'] + ': ' + str(obj['Size']))

work/network_affiliates_final/2lka.net.csv: 43718
work/network_affiliates_final/7eer.net.csv: 124175399
work/network_affiliates_final/admitad.com.csv: 61416045
work/network_affiliates_final/affilired.com.csv: 167129
work/network_affiliates_final/anrdoezrs.net.csv: 25566796
work/network_affiliates_final/avantlink.com.csv: 2811463
work/network_affiliates_final/awin1.com.csv: 41934357
work/network_affiliates_final/cityads.csv: 104411
work/network_affiliates_final/click.linksynergy.com.csv: 4972954
work/network_affiliates_final/dpbolvw.net.csv: 10237650
work/network_affiliates_final/evyy.net.csv: 469105
work/network_affiliates_final/go.skimresources.com.csv: 41872521
work/network_affiliates_final/jdoqocy.com.csv: 18892793
work/network_affiliates_final/kqzyfj.com.csv: 15317560
work/network_affiliates_final/ojrq.net.csv: 5352
work/network_affiliates_final/prf.hn.csv: 213415829
work/network_affiliates_final/pxf.io.csv: 557993
work/network_affiliates_final/sjv.io.csv: 115247
work/network_affil

In [201]:
total = 0
for obj in s3.list_objects_v2(Bucket=bucket, Prefix=prefix)['Contents']:
    sdf = spark.read.csv(f's3a://miba-ma-prj-aviasales/{obj["Key"]}', sep=',', header=True)
    length = sdf.count()
    total += length
    print(f'{obj["Key"].split("/")[2]}: {length}')
print(' ')
print(f'Total links: {total}')

2lka.net.csv: 358
7eer.net.csv: 456855
admitad.com.csv: 134693
affilired.com.csv: 972
anrdoezrs.net.csv: 115670
avantlink.com.csv: 8171
awin1.com.csv: 241457
cityads.csv: 471
click.linksynergy.com.csv: 18761
dpbolvw.net.csv: 50433
evyy.net.csv: 1941
go.skimresources.com.csv: 137795
jdoqocy.com.csv: 73443
kqzyfj.com.csv: 75087
ojrq.net.csv: 25
prf.hn.csv: 857715
pxf.io.csv: 2591
sjv.io.csv: 572
tc.tradetracker.net.csv: 996999
tradedoubler.com.csv: 1169251
viglink.com.csv: 108380
 
Total links: 4451640


In [204]:
df = spark.read.csv(f's3a://miba-ma-prj-aviasales/work/network_affiliates_final/viglink.com.csv', sep=',', header=True).drop('brand')
df.limit(50).toPandas()

Unnamed: 0,refdomain,ahrefs_rank,domain_rating,ahrefs_top,links_internal,links_external,language,url_to,last_visited,linked_root_domains,traffic,advertiser
0,gopromocodes.herokuapp.com,0,0,166238526,69,31,en,http://redirect.viglink.com/?u=http%3A%2F%2Fls...,2020-10-25T18:16:15Z,4,0.0,travelation.com
1,gopromocodes.herokuapp.com,0,0,166238526,69,31,en,http://redirect.viglink.com/?u=http%3A%2F%2Fls...,2020-10-25T18:16:15Z,4,0.0,travelation.com
2,gopromocodes.herokuapp.com,0,0,166238526,69,31,en,http://redirect.viglink.com/?u=http%3A%2F%2Fls...,2020-10-25T18:16:15Z,4,0.0,travelation.com
3,20dollaradayvacationspots.blogspot.com,0,0,0,6,73,en,https://redirect.viglink.com/?u=https%3A%2F%2F...,2020-05-13T09:13:51Z,16,0.0,busbud.com
4,20dollaradayvacationspots.blogspot.com,0,0,0,6,73,en,https://redirect.viglink.com/?u=https%3A%2F%2F...,2020-05-13T09:13:51Z,16,0.0,busbud.com
5,couponcode.guru,0,0,120357897,26,5,en,https://redirect.viglink.com/?u=https://www.su...,2020-05-20T08:49:24Z,1,5e-06,suntransfers.com
6,couponcode.guru,0,0,120357897,26,5,en,https://redirect.viglink.com/?u=https://www.su...,2020-05-20T08:49:24Z,1,5e-06,suntransfers.com
7,blog-makanandikualalumpur.blogspot.com,0,0,115661013,70,18,id,http://redirect.viglink.com/?u=http%3A%2F%2Fww...,2020-10-30T16:06:47Z,7,0.0,streetdirectory.com.my
8,site-reviews.info,0,0,106425445,88,2,en,https://redirect.viglink.com/?u=http%3A%2F%2Fk...,2020-11-02T02:19:19Z,2,0.0,kiwi.com
9,site-reviews.info,0,0,106425445,88,2,en,https://redirect.viglink.com/?u=http%3A%2F%2Fk...,2020-11-02T02:19:19Z,2,0.0,kiwi.com
