# ACMA Scrapper
##### ©Haris Hassan

## Libraries

In [None]:
##======================##
## ACMAscrapper V 6.1.2 ##
##======================##
##
## Scrapper for Register of Radiocommunications Licences of Australian Communications and Media Authority
# 
# Author Haris Hassan
# Email hharis11@hotmail.com
# linkedin https://www.linkedin.com/in/hassanharis/
#
##=============================================================================
# Import libraries

import re
import time
import requests
import random
import string
from bs4 import BeautifulSoup
import pandas as pd
pd.set_option('display.max_colwidth', None)
import cProfile, pstats


## User Input

In [None]:
#### Replace Site 
sitecode = 9892

##### Filters
TRANSMITTER_ONLY = False

THIS_CLIENT_ONLY = False
THIS_CLIENT_ONLY_NAME = ''

IGNORE_CLIENT = False
IGNORE_CLIENT_NAME = ''

FREQUENCY_FILTER = False
MIN_FREQ = 650
MAX_FREQ = 3800

if 'google.colab' in str(get_ipython()):
    print('Running on CoLab')
    running_in_colab = True
else:
    google_sheet_url = False
    running_in_colab = False

## Functions

In [None]:
# Get a random user agent from the list
def get_random_user_agent():
    user_agents = [
        'Mozilla/5.0 (Linux; Android 12; SM-S906N Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/80.0.3987.119 Mobile Safari/537.36',
        'Mozilla/5.0 (Linux; Android 10; SM-G996U Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Mobile Safari/537.36',
        'Mozilla/5.0 (Linux; Android 10; SM-G980F Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.96 Mobile Safari/537.36',
        'Mozilla/5.0 (Linux; Android 9; SM-G973U Build/PPR1.180610.011) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36',
        'Mozilla/5.0 (Linux; Android 8.0.0; SM-G960F Build/R16NW) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.84 Mobile Safari/537.36',
        'Mozilla/5.0 (Linux; Android 12; Pixel 6 Build/SD1A.210817.023; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/94.0.4606.71 Mobile Safari/537.36',
        'Mozilla/5.0 (Linux; Android 11; Pixel 5 Build/RQ3A.210805.001.A1; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.159 Mobile Safari/537.36',
        'Mozilla/5.0 (iPhone14,3; U; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/602.1.50 (KHTML, like Gecko) Version/10.0 Mobile/19A346 Safari/602.1',
        'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36',
        'Mozilla/5.0 (Linux; Android 5.0.2; SAMSUNG SM-T550 Build/LRX22G) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/3.3 Chrome/38.0.2125.102 Safari/537.36',
        'Mozilla/5.0 (Linux; Android 7.0; SM-T827R4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.116 Safari/537.36',
        'Mozilla/5.0 (Linux; Android 7.0; Pixel C Build/NRD90M; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/52.0.2743.98 Safari/537.36',
        'Mozilla/5.0 (Linux; Android 11; Lenovo YT-J706X) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36',
        'Mozilla/5.0 (Linux; Android 12; SM-X906C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/80.0.3987.119 Mobile Safari/537.36',        
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 Edge/12.246'
    ]
    return random.choice(user_agents)

def initialize_acma_site(soup):
    SiteDetailsTitle, SiteDetails = scrape_table(webpage = soup)
    SiteDetailsDictionary = {SiteDetailsTitle[i]: SiteDetails[i] for i in range(len(SiteDetailsTitle))}
    SiteDetailsDictionary['Location'] = ' '.join(SiteDetailsDictionary['Location'].split())
    return SiteDetailsDictionary
    

# Scrape webpage from the given URL and return the parsed content
def scrape_page(url):
    try:
        response = session.get(url)
        if response.status_code == 200:
            return BeautifulSoup(response.text, 'lxml')
        else:
            print(f"Error: Failed to fetch data from {url}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"Error: {str(e)}")
        return None

#Filter the antennas based on type, client and frequency
def filter_assignments(assignments, TRANSMITTER_ONLY, THIS_CLIENT_ONLY, 
                       THIS_CLIENT_ONLY_NAME, IGNORE_CLIENT, IGNORE_CLIENT_NAME,
                      FREQUENCY_FILTER, MAX_FREQ, MIN_FREQ):
    assignments['Frequency (MHz)'] = frequency_to_mhz(assignments['Frequency'])
    filteredAssignments = assignments
    if TRANSMITTER_ONLY:
        filteredAssignments = filteredAssignments.loc[filteredAssignments['T/R'] == 'T']
    if THIS_CLIENT_ONLY:
        filteredAssignments = filteredAssignments.loc[filteredAssignments['Client'] == THIS_CLIENT_ONLY_NAME]
    if IGNORE_CLIENT:
        filteredAssignments = filteredAssignments.loc[filteredAssignments['Client'] != IGNORE_CLIENT_NAME]
    if FREQUENCY_FILTER:
        filteredAssignments = filteredAssignments.loc[(filteredAssignments['Frequency (MHz)'] < MIN_FREQ) | (filteredAssignments['Frequency (MHz)']>MAX_FREQ)]
    return filteredAssignments

#Convert all frequencies to MHz
def frequency_to_mhz(FREQ):
    frequency = []
    for x in FREQ:
        if 'GHz' in x:
            frequency.append(float(re.sub(' GHz','',x))*1000)
        elif 'MHz' in x:
            frequency.append(float(re.sub(' MHz','',x)))
    return frequency

def get_all_assignments_at_this_Site(soup):
    assignments = soup.find('table',{"class": "tablelist responsive"})
    assignments_headers = [td.text.strip() for td in assignments.select('th')]
    assignments_data_main = scrape_assignments_pages(pd.DataFrame(columns = assignments_headers), soup)
    assignments_data_main = assignments_data_main.reset_index(drop=True)
    return assignments_data_main

#Scrape all assignment entries in ACMA site assignment table
def scrape_assignments_pages(assignments_data_main, assignments_page):
    if assignments_page:
        Assignments = assignments_page.find('table',{"class": "tablelist responsive"})
        assignments_headers = [td.text.strip() for td in Assignments.select('th')]
        assignments_data = pd.DataFrame(columns = assignments_headers)

        assignment_links = []
        for j in Assignments.find_all('tr')[1:]:
            assignments_values = [tv.text.strip() for tv in j.find_all('td')]
            assignments_data.loc[len(assignments_data)] = assignments_values
            assignment_links.append([tl.get('href') for tl in j.find_all('a')][0])
        assignments_data.insert(1,'links', assignment_links)
        assignments_data_main = pd.concat([assignments_data_main, assignments_data], axis=0)

        #Check if there's another Page
        NEXT_PAGE_LINK = ''.join(['https://web.acma.gov.au' + x for x in  [tl.get('href') for tl in assignments_page.findAll('a',{'title':"Next Page"})]])

        if NEXT_PAGE_LINK and NEXT_PAGE_LINK.strip():
            print(NEXT_PAGE_LINK)
            NEXT_PAGE = scrape_page(NEXT_PAGE_LINK)
            return scrape_assignments_pages(assignments_data_main, NEXT_PAGE)
        else:
            return assignments_data_main
    
def start_Session():
    """
    Start the session reusing the underlying TCP connection for multiple requests as
    well as with gzip compression and randomly selected useragent 
    """
    session = requests.Session()
    session.headers.update({'Accept-Encoding': 'gzip'})
    session.headers.update({'User-Agent': get_random_user_agent()})
    return session

def scrape_table(webpage, instance = 1):
    """
    return the first column of the table as header list and 2nd column of table as values list
    """
    table = webpage.select_one('table:nth-of-type('+str(instance)+')',{"class": "tabledetail"})
    table_headers = [td.text for td in table.select('td:nth-of-type(1)') if not td.has_attr('colspan')]
    table_Values = [td.text.strip() for td in table.select('td:nth-of-type(2)')]   
    return table_headers, table_Values

def find_link_destination(LinkedAssignmentsTable):
    """
    Checking if Antenna information page has destination table and return destination site name if it has
    otherwise return n/a
    """
    LinkToList = []    
    if LinkedAssignmentsTable:
        for j in LinkedAssignmentsTable.find_all('tr')[1:]:
            LinkToList.append([tv.text.strip() for tv in j.select('td:nth-of-type(5)')] )
            
    if LinkToList:
        LinkToListTemp = list(dict.fromkeys([''.join(p) for p in LinkToList]))
        for todel in LinkToListTemp:
            if SiteDetailsDictionary['Location'] in todel:
                LinkToListTemp.remove(todel)
        LinkToList = ''.join(LinkToListTemp[:])
        
    if not LinkToList:
        LinkToList = 'N/A'
    return LinkToList

#Scrape each antenna and add to antenna dataframe.
def add_antenna_to_table(antennas_data, websoup, table, LinkedAssignmentsTable):
    Antennaheaders, AntennaValues = scrape_table(websoup, table)
    Antennaheaders.extend(['Destination Link'])
    AntennaValues.extend([find_link_destination(LinkedAssignmentsTable)])
    antennas_data_toAdd = pd.DataFrame([{Antennaheaders[i]: AntennaValues[i] for i in range(len(Antennaheaders))}])
    antennas_data = pd.concat([antennas_data, antennas_data_toAdd], ignore_index=True)
    return antennas_data

def reformat_antennas_data(antennas_data):
    antennas_data = antennas_data.fillna('')
    if TRANSMITTER_ONLY:
        antennas_data = antennas_data.drop(antennas_data[antennas_data['Device Type'] == 'Receiver'].index)
    try:
        antennas_data['Antenna']=([''.join(x[2].strip().title() +' '+ x[1].strip() + ' '+ x[0].strip().title()) for x in antennas_data['Antenna'].str.split(',', 2)])
        antennas_data['Date Authorised']=([''.join(x[2] +'-'+ x[1] + '-'+ x[0].title()) for x in antennas_data['Date Authorised'].str.split('/', 2)])
    except Exception as e:
                print(e)
                pass
    if 'EFL ID' in antennas_data:
        antennas_data['Device Registration ID'] = antennas_data['Device Registration ID'].astype(str) + antennas_data['EFL ID']
  
    antennas_data['Antenna']= antennas_data['Antenna'].str.replace('Rf Industries', 'RFI')
    antennas_data['Antenna']= antennas_data['Antenna'].str.replace('Australia', '')
    antennas_data['Antenna']= antennas_data['Antenna'].str.replace('Parallel Array Of Vertical Dipoles', 'Dipole Array')
    antennas_data['Antenna']= antennas_data['Antenna'].str.replace('High Performance', '')
    antennas_data['Antenna']= antennas_data['Antenna'].str.replace(' (Horizontal Polarisation)-Y', '', regex=False)
    antennas_data['Antenna']= antennas_data['Antenna'].str.replace(' (Vertical Polarisation)-Y', '', regex=False)
    

    #antennas_data['Client']=antennas_data['Client'].apply(lambda x: x.title())
    antennas_data['Client']=antennas_data['Client'].str.replace('Limited', 'Ltd')
    antennas_data['Client']=antennas_data['Client'].str.replace('Ltd', '')
    antennas_data['Client']=antennas_data['Client'].str.replace('Pty', '')
    antennas_data['Client']=antennas_data['Client'].str.replace('Australia', '')
    antennas_data['Client']=antennas_data['Client'].str.replace('NEW SOUTH WALES', 'NSW')
    antennas_data['Client']=antennas_data['Client'].str.replace('GOVERNMENT', 'Government')
    antennas_data['Client']=antennas_data['Client'].str.replace('Government TELECOMMUNICATIONS AUTHORITY', 'Telco Authority')

    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Jan', '01')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Feb', '02')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Mar', '03')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Apr', '04')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('May', '05')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Jun', '06')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Jul', '07')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Aug', '08')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Sep', '09')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Oct', '10')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Nov', '11')
    antennas_data['Date Authorised']=antennas_data['Date Authorised'].str.replace('Dec', '12')

    antennas_data['Antenna Polarisation']=antennas_data['Antenna Polarisation'].str.replace('Linear', '')
    antennas_data['Transmitter Power']=antennas_data['Transmitter Power'].str.replace(' pY', '')
    antennas_data['Transmitter Power']=antennas_data['Transmitter Power'].str.replace(' Mean Power', '')
    antennas_data['Freq (MHz)'] = frequency_to_mhz(antennas_data['Emission Center Frequency'])

    antennas_data['Antenna']= antennas_data['Antenna'].str.strip()
    antennas_data['Antenna Polarisation']= antennas_data['Antenna Polarisation'].str.strip()
    return antennas_data

def scrape_antennas_data(assignments_links):
    antennas_data = pd.DataFrame(columns = antennas_data_header)
    NotFoundLinks = []        
    for index, acmalink in enumerate(assignments_links):
        st = time.time()
        try:
            soup2 = scrape_page('https://web.acma.gov.au' + acmalink)
            linked_assignments_table = soup2.find("table", {"class": "tablelist linked-responsive"})

            try:
                antennas_data = add_antenna_to_table(antennas_data, soup2, 1, linked_assignments_table)
            except Exception as e:
                print (e)
                print(acmalink)
                continue

            if acmalink[-2:]=='/1':
                try:
                    antennas_data = add_antenna_to_table(antennas_data, soup2, 2, linked_assignments_table)
                except Exception as e:
                    print(e)
                    print(acmalink)
                    continue

        except Exception as e:
            print(e)
            print('\nException')
            print(acmalink)
            print('\n')
            NotFoundLinks.append(acmalink)
            continue

        et = time.time()
        elapsed_time = et - st
        print(str(index+1)+ '. ' + acmalink)
        print('Execution time:', elapsed_time, 'seconds')
    antennas_data = reformat_antennas_data(antennas_data)
    display(NotFoundLinks)
    return antennas_data, NotFoundLinks

#Save the scraped and filtered results to a file with the given filename.
def save_results_to_excel(antennas_data_export, filename):
    Renamed_headers = ['Device ID', 'Antenna', 'Client','Type','Freq (MHz)','Power','Height', 
                      'Polarisation','Azimuth', 'Tilt','Licence','Date Authorised','Destination Link']
    antennas_data_export = antennas_data_export.rename(columns={'Device Registration ID': 'Device ID', 'Device Type': 'Type', 
                                                          'Emission Center Frequency': 'Frequency'
                                                          ,'Transmitter Power': 'Power'
                                                          , 'Antenna Height (AGL)': 'Height', 
                                                          'Antenna Polarisation': 'Polarisation', 
                                                          'Antenna Azimuth': 'Azimuth',
                                                          'Licence Number': 'Licence','Antenna Tilt': 'Tilt'})
    antennas_data_export.sort_values(by=['Client', 'Antenna', 'Azimuth'], inplace=True)
    antennas_data_export = antennas_data_export.reset_index(drop=True)
    antennas_data_export.index += 1
    antennas_data_export.to_excel(filename, columns = Renamed_headers, index=True)
    print(f"Results saved to {filename}")
    antennas_data_export.to_html(r'C:\Users\Mewtwo\Desktop\Antennadata.html', columns = Renamed_headers, index=True)
    if running_in_colab:
        download_file_from_google_colab(filename)
    return antennas_data_export[Renamed_headers]

def download_file_from_google_colab(output_file_path):
    from google.colab import files
    files.download(output_file_path)
    print(f"File downloaded: {output_file_path}")

## Main

In [None]:
if __name__ == "__main__":
    profiler = cProfile.Profile()
    profiler.enable()
    
    ACMAsite_url = 'https://web.acma.gov.au/rrl/site_search.site_lookup?pSITE_ID=' + str(sitecode)

    session = start_Session()
    soup = scrape_page(ACMAsite_url)
    SiteDetailsDictionary = initialize_acma_site(soup)
    
    file_name = 'ACMA ' + str(sitecode) + " " + re.sub("[\\\\/]", " ", SiteDetailsDictionary['Location']) + '.xlsx'
    file_path = r'C:\Users\Mewtwo\Desktop\\' + file_name
    if running_in_colab:
        file_path = file_name
    antennas_data_header = ['Device Registration ID', 'Antenna', 'Client','Device Type','Emission Center Frequency',
                          'Transmitter Power', 'Antenna Height (AGL)','Antenna Polarisation','Antenna Azimuth', 
                          'Antenna Tilt','Licence Number','Date Authorised','Destination Link']
    
    site_assignments = get_all_assignments_at_this_Site(soup)
    site_assignments_filtered = filter_assignments(site_assignments, TRANSMITTER_ONLY, THIS_CLIENT_ONLY, THIS_CLIENT_ONLY_NAME, IGNORE_CLIENT, IGNORE_CLIENT_NAME,FREQUENCY_FILTER, MAX_FREQ, MIN_FREQ)
    assignments_links = list(dict.fromkeys(site_assignments_filtered['links']) )
    display('Found ' +str(len(site_assignments['links'])) + ' assignments and filtered ' + str(len(site_assignments_filtered['links'])) )

    antennas_data, NotFoundLinks = scrape_antennas_data(assignments_links)
    profiler.disable()
    
    save_results_to_excel(antennas_data, file_path)

    stats = pstats.Stats(profiler).sort_stats('ncalls')
    #stats.print_stats()
