# Google Workspace APIs
Use Google's Workspace API (managed here https://console.cloud.google.com/) for various tasks.

Credentials can be downloaded as `credentials.json` from the API Credentials and used to produce `token.json` for persistent use. OAuth 2.0 requires the redirect URI in the app and in the "Flow" - within the code - to be the same (the default port for localhost is 8080). It's also worth noting that for apps in "testing," users need to be added via the OAuth consent screen, although in testing "scope" does not need to be specified in the API (it can be tested and switched in the code).

Import packages.

In [1]:
import io
import os
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
import pytz
import datetime
import git

Google API class (based on `quickstart.py`).

In [2]:
class GoogleDrive:
    def __init__(self):
        # delete token.json before changing these
        self.scopes = [
            # 'https://www.googleapis.com/auth/drive.metadata.readonly',
            'https://www.googleapis.com/auth/drive.readonly'
        ]
        self.creds = None
        self.credentials()
        self.connect()
    
    def credentials(self):
        # store credentials (user access and refresh tokens)
        if os.path.exists('token.json'):
            self.creds = Credentials.from_authorized_user_file('token.json', self.scopes)
        # if no (valid) credentials available, let user log in
        if not self.creds or not self.creds.valid:
            if self.creds and self.creds.expired and self.creds.refresh_token:
                self.creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file('credentials.json', self.scopes)
                self.creds = flow.run_local_server()  # port MUST match redirect URI in Google App
            # save credentials for the next run
            with open('token.json', 'w') as token:
                token.write(self.creds.to_json())
                
    def connect(self):
        # attempt to connect to the API
        try:
            self.service = build('drive', 'v3', credentials=self.creds)
            # self.service = build('gmail', 'v1', credentials=self.creds)  # use later for gmail...
        except HttpError as error:
            print(f'An error occurred: {error}')
            
    def get_id(self, values, term='name', operator='=', ftype='file', ignore_trashed=True):
        q = f'{term} {operator} "{values}" '
        if ftype == 'folder':
            q += 'and mimeType = "application/vnd.google-apps.folder" '
        elif ftype == 'json':
            q += 'and mimeType = "application/json" '
        if ignore_trashed:
            q += 'and trashed = false'
        l = self.service.files().list(q=q).execute()
        
        return l['files']
            
    def folder_contents(self, i, ignore_trashed=True):
        q = f'"{i}" in parents '
        if ignore_trashed:
            q += 'and trashed = false '
        l = g.service.files().list(q=q).execute()
        
        return l['files']
    
    def create_folders(self, d):
        try:
            os.mkdir(d['path'])
        except:
            pass

        for c in d['contents']:
            if 'contents' in c.keys():
                self.create_folders(c)
            
    def get_revisions(self, i):
        try:
            r = self.service.revisions().list(fileId=i).execute()
        
            return r['revisions']
        
        except:
            return
        
    def qry_fields(self, i, r=None, fields=['parents']):
        if r is None:
            p = self.service.files().get(fileId=i, fields=','.join(fields)).execute()
        else:
            p = self.service.revisions().get(fileId=i, revisionId=r, fields=','.join(fields)).execute()
        
        return {f: p[f] for f in fields}
    
    def stream_file(self, i, r=None, out='stream', verbose=False):
        if r is None:
            request = self.service.files().get_media(fileId=i)
        else:
            request = self.service.revisions().get_media(fileId=i, revisionId=r)
        
        if out in ['stream', 'str']:
            stream = io.BytesIO()
        else:
            stream = io.FileIO(out, mode='w')
        downloader = MediaIoBaseDownload(stream, request)
        done = False
        while not done:
            status, done = downloader.next_chunk()
            if verbose:
                print(f'Download {int(status.progress() * 100)}%')
        if verbose:
            print(f'Size {status.total_size / 1024 / 1024:.2f}MB')

        if out in ['str']:
            return stream.getvalue()
        else:
            return stream
        
    def folder_walk(self, folder, path=''):
        # if root, set path to folder name
        if path == '':
            path = folder['name']

        # scan contents
        contents = []
        for c in g.folder_contents(folder['id']):
            if c['mimeType'] == 'application/vnd.google-apps.folder':
                p = os.path.join(path, c['name'])
                contents.append(self.folder_walk(c, path=p))
            else:
                f = {
                    'path': os.path.join(path, c['name']),
                    'id': c['id'],
                    'type': c['mimeType'],
                    'revisions': self.get_revisions(c['id'])
                }
                contents.append(f)
                
        # set up output dictionary
        out = {
            'path': path,
            'id': folder['id'],
            'type': folder['mimeType'],
            'contents': contents
        }
                
        return out
    
    def bundle_commits(self, structure, commits={}):
        for c in structure['contents']:
            if c['type'] == 'application/vnd.google-apps.folder':
                commits = self.bundle_commits(c, commits=commits)
            else:
                if 'revisions' in c.keys():
                    for r in c['revisions']:
                        ret = {
                            'path': c['path'],
                            'id': c['id'],
                            'rid': r['id']
                        }
                        k = r['modifiedTime']
                        v = commits.get(k, [])
                        if ret['rid'] not in [i['rid'] for i in v]:
                            v.append(ret)
                            commits.update({k: v})

        return commits

Connect to API.

In [3]:
g = GoogleDrive()

## Get data

In [None]:
# scout folders, files, and revisions
# folder = g.get_id('tig', ftype='folder')[0]
# recon = g.folder_walk(folder)

# build out folder structure  
# g.create_folders(recon)

# set commit ordering
# g.bundle_commits(recon)
# sorted(t)

## Commit to new `git` repo
The GitPython package is as lazy as possible meaning that it takes arguments from existing git environmental variables where possible.

In [12]:
# scout folders, files, and revisions
name = 'tig'
folder = g.get_id(name, ftype='folder')
assert len(folder) == 1
recon = g.folder_walk(folder[0])
commits = g.bundle_commits(recon)
dates = sorted(commits)

# set up repo
repo = git.Repo.init(name, expand_vars=False)
author = git.Actor(name='Craig N', email='7h47ch@gmail.com')
utc = pytz.timezone('UTC')
tz = pytz.timezone('US/Eastern')

# make folders
g.create_folders(recon)

# auto-commit
for i, date in enumerate(dates):
    # set commit date
    parsed_date = datetime.datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.%fZ')
    cdate = utc.localize(parsed_date).astimezone(tz)
    # print(r['modifiedTime'], git.objects.util.parse_date(cdate))

    # make files
    revisions = commits.get(date, [])
    for r in revisions:
        t = g.stream_file(r['id'], r=r['rid'], out=r['path'])  # need to create DIRECTORIES too!!!
        while not os.path.isfile(r['path']):
            datetime.time.sleep(1)

    # commit to repo
    repo.index.add([r['path'] for r in revisions])
    repo.index.commit(f'Google Drive to git auto-commit, update {i+1}.', author=author, committer=author,
                      author_date=cdate, commit_date=cdate)  # add author or committer args

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'tig\\dtiga_rfmc_report.ipynb'

In [14]:
t.closed

False